summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjaseg <code@jaseg.net>2019-11-30 21:15:57 +0100
committerjaseg <code@jaseg.net>2019-12-01 21:25:04 +0100
commit49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9 (patch)
tree1f34fa964ea79a0a75b753685bc72b580e8c5375
parent3d6155871447ef5c5e4d54c39eb8ff573a3b49e9 (diff)
downloadpython-mpv-49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9.tar.gz
python-mpv-49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9.tar.bz2
python-mpv-49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9.zip
Add stream protocol handling
This allows you to directly feed bytes into mpv without going through a file, FIFO etc. first. The new API is: @player.register_stream_protocol(name) @player.python_stream(name, size) @player.python_stream_catchall See their docstrings for their usage.
-rw-r--r--README.rst18
-rwxr-xr-xmpv-test.py88
-rw-r--r--mpv.py265
3 files changed, 365 insertions, 6 deletions
diff --git a/README.rst b/README.rst
index c609969..e46da2b 100644
--- a/README.rst
+++ b/README.rst
@@ -135,6 +135,24 @@ Playlist handling
print(player.playlist)
player.wait_for_playback()
+Directly feeding mpv data from python
+.....................................
+
+.. code:: python
+
+ #!/usr/bin/env python3
+ import mpv
+
+ player = mpv.MPV()
+ @player.python_stream('foo')
+ def reader():
+ with open('test.webm', 'rb') as f:
+ while True:
+ yield f.read(1024*1024)
+
+ player.play('python://foo')
+ player.wait_for_playback()
+
PyQT embedding
..............
diff --git a/mpv-test.py b/mpv-test.py
index 9e83dbb..3ac8985 100755
--- a/mpv-test.py
+++ b/mpv-test.py
@@ -350,6 +350,91 @@ class KeyBindingTest(MpvTestCase):
self.assertNotIn(b('b'), self.m._key_binding_handlers)
self.assertIn(b('c'), self.m._key_binding_handlers)
+class TestStreams(unittest.TestCase):
+ def test_python_stream(self):
+ handler = mock.Mock()
+
+ m = mpv.MPV()
+ m.register_event_callback(handler)
+
+ @m.python_stream('foo')
+ def foo_gen():
+ with open(TESTVID, 'rb') as f:
+ yield f.read()
+
+ @m.python_stream('bar')
+ def bar_gen():
+ yield b''
+
+ m.play('python://foo')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.EOF, 'error': mpv.ErrorCode.SUCCESS}})
+ handler.reset_mock()
+
+ m.play('python://bar')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}})
+ handler.reset_mock()
+
+ m.play('python://baz')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
+ handler.reset_mock()
+
+ m.play('foo://foo')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
+ handler.reset_mock()
+
+ foo_gen.unregister()
+
+ m.play('python://foo')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
+ handler.reset_mock()
+
+ m.play('python://bar')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}})
+ handler.reset_mock()
+
+ m.terminate()
+
+ def test_custom_stream(self):
+ handler = mock.Mock()
+ fail_mock = mock.Mock(side_effect=ValueError)
+ stream_mock = mock.Mock()
+ stream_mock.seek = mock.Mock(return_value=0)
+ stream_mock.read = mock.Mock(return_value=b'')
+
+ m = mpv.MPV(video=False)
+ m.register_event_callback(handler)
+
+ m.register_stream_protocol('pythonfail', fail_mock)
+
+ @m.register_stream_protocol('pythonsuccess')
+ def open_fn(uri):
+ self.assertEqual(uri, 'pythonsuccess://foo')
+ return stream_mock
+
+ m.play('pythondoesnotexist://foo')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
+ handler.reset_mock()
+
+ m.play('pythonfail://foo')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
+ handler.reset_mock()
+
+ m.play('pythonsuccess://foo')
+ m.wait_for_playback()
+ stream_mock.seek.assert_any_call(0)
+ stream_mock.read.assert_called()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}})
+
+ m.terminate()
+
class TestLifecycle(unittest.TestCase):
def test_create_destroy(self):
thread_names = lambda: [ t.name for t in threading.enumerate() ]
@@ -386,8 +471,7 @@ class TestLifecycle(unittest.TestCase):
m.unregister_event_callback(handler)
handler.assert_has_calls([
mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 6, 'event': None}),
- mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 9, 'event': None}),
- mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 7, 'event': {'reason': 4}}),
+ mock.call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.NOTHING_TO_PLAY}})
], any_order=True)
handler.reset_mock()
diff --git a/mpv.py b/mpv.py
index 7e8e4ae..882a30b 100644
--- a/mpv.py
+++ b/mpv.py
@@ -72,6 +72,14 @@ class ErrorCode(object):
PROPERTY_UNAVAILABLE = -10
PROPERTY_ERROR = -11
COMMAND = -12
+ LOADING_FAILED = -13
+ AO_INIT_FAILED = -14
+ VO_INIT_FAILED = -15
+ NOTHING_TO_PLAY = -16
+ UNKNOWN_FORMAT = -17
+ UNSUPPORTED = -18
+ NOT_IMPLEMENTED = -19
+ GENERIC = -20
EXCEPTION_DICT = {
0: None,
@@ -88,7 +96,17 @@ class ErrorCode(object):
-9: lambda *a: TypeError('Tried to get/set mpv property using wrong format, or passed invalid value', *a),
-10: lambda *a: PropertyUnavailableError('mpv property is not available', *a),
-11: lambda *a: RuntimeError('Generic error getting or setting mpv property', *a),
- -12: lambda *a: SystemError('Error running mpv command', *a) }
+ -12: lambda *a: SystemError('Error running mpv command', *a),
+ -14: lambda *a: RuntimeError('Initializing the audio output failed', *a),
+ -15: lambda *a: RuntimeError('Initializing the video output failed'),
+ -16: lambda *a: RuntimeError('There was no audio or video data to play. This also happens if the file '
+ 'was recognized, but did not contain any audio or video streams, or no '
+ 'streams were selected.'),
+ -17: lambda *a: RuntimeError('When trying to load the file, the file format could not be determined, '
+ 'or the file was too broken to open it'),
+ -18: lambda *a: ValueError('Generic error for signaling that certain system requirements are not fulfilled'),
+ -19: lambda *a: NotImplementedError('The API function which was called is a stub only'),
+ -20: lambda *a: RuntimeError('Unspecified error') }
@staticmethod
def default_error_handler(ec, *args):
@@ -282,14 +300,24 @@ class MpvEventLogMessage(Structure):
'level': self.level.decode('utf-8'),
'text': decoder(self.text).rstrip() }
-class MpvEventEndFile(c_int):
- EOF_OR_INIT_FAILURE = 0
+class MpvEventEndFile(Structure):
+ _fields_ = [('reason', c_int),
+ ('error', c_int)]
+
+ EOF = 0
RESTARTED = 1
ABORTED = 2
QUIT = 3
+ ERROR = 4
+ REDIRECT = 5
+
+ # For backwards-compatibility
+ @property
+ def value(self):
+ return self.reason
def as_dict(self, decoder=identity_decoder):
- return {'reason': self.value}
+ return {'reason': self.reason, 'error': self.error}
class MpvEventScriptInputDispatch(Structure):
_fields_ = [('arg0', c_int),
@@ -305,6 +333,22 @@ class MpvEventClientMessage(Structure):
def as_dict(self, decoder=identity_decoder):
return { 'args': [ self.args[i].decode('utf-8') for i in range(self.num_args) ] }
+StreamReadFn = CFUNCTYPE(c_int64, c_void_p, POINTER(c_char), c_uint64)
+StreamSeekFn = CFUNCTYPE(c_int64, c_void_p, c_int64)
+StreamSizeFn = CFUNCTYPE(c_int64, c_void_p)
+StreamCloseFn = CFUNCTYPE(None, c_void_p)
+StreamCancelFn = CFUNCTYPE(None, c_void_p)
+
+class StreamCallbackInfo(Structure):
+ _fields_ = [('cookie', c_void_p),
+ ('read', StreamReadFn),
+ ('seek', StreamSeekFn),
+ ('size', StreamSizeFn),
+ ('close', StreamCloseFn), ]
+# ('cancel', StreamCancelFn)]
+
+StreamOpenFn = CFUNCTYPE(c_int, c_void_p, c_char_p, POINTER(StreamCallbackInfo))
+
WakeupCallback = CFUNCTYPE(None, c_void_p)
OpenGlCbUpdateFn = CFUNCTYPE(None, c_void_p)
@@ -387,6 +431,8 @@ _handle_func('mpv_wakeup', [],
_handle_func('mpv_set_wakeup_callback', [WakeupCallback, c_void_p], None, errcheck=None)
_handle_func('mpv_get_wakeup_pipe', [], c_int, errcheck=None)
+_handle_func('mpv_stream_cb_add_ro', [c_char_p, c_void_p, StreamOpenFn], c_int, ec_errcheck)
+
_handle_func('mpv_get_sub_api', [MpvSubApi], c_void_p, notnull_errcheck)
_handle_gl_func('mpv_opengl_cb_set_update_callback', [OpenGlCbUpdateFn, c_void_p])
@@ -517,6 +563,40 @@ class _DecoderPropertyProxy(_PropertyProxy):
def __setattr__(self, name, value):
setattr(self.mpv, _py_to_mpv(name), value)
+class GeneratorStream:
+ """Transform a python generator into an mpv-compatible stream object. This only supports size() and read(), and
+ does not support seek(), close() or cancel().
+ """
+
+ def __init__(self, generator_fun, size=None):
+ self._generator_fun = generator_fun
+ self.size = size
+
+ def seek(self, offset):
+ self._read_iter = iter(self._generator_fun())
+ self._read_chunk = b''
+ return 0 # We only support seeking to the first byte atm
+ # implementation in case seeking to arbitrary offsets would be necessary
+ # while offset > 0:
+ # offset -= len(self.read(offset))
+ # return offset
+
+ def read(self, size):
+ if not self._read_chunk:
+ try:
+ self._read_chunk += next(self._read_iter)
+ except StopIteration:
+ return b''
+ rv, self._read_chunk = self._read_chunk[:size], self._read_chunk[size:]
+ return rv
+
+ def close(self):
+ self._read_iter = iter([]) # make next read() call return EOF
+
+ def cancel(self):
+ self._read_iter = iter([]) # make next read() call return EOF
+ # TODO?
+
class MPV(object):
"""See man mpv(1) for the details of the implemented commands. All mpv properties can be accessed as
``my_mpv.some_property`` and all mpv options can be accessed as ``my_mpv['some-option']``.
@@ -565,6 +645,11 @@ class MPV(object):
self._event_handle = _mpv_create_client(self.handle, b'py_event_handler')
self._loop = partial(_event_loop, self._event_handle, self._playback_cond, self._event_callbacks,
self._message_handlers, self._property_handlers, log_handler)
+ self._stream_protocol_cbs = {}
+ self._stream_protocol_frontends = collections.defaultdict(lambda: {})
+ self.register_stream_protocol('python', self._python_stream_open)
+ self._python_streams = {}
+ self._python_stream_catchall = None
if loglevel is not None or log_handler is not None:
self.set_loglevel(loglevel or 'terminal-default')
if start_event_thread:
@@ -1028,6 +1113,87 @@ class MPV(object):
if not self._key_binding_handlers:
self.unregister_message_handler('key-binding')
+ def register_stream_protocol(self, proto, open_fn=None):
+ """ Register a custom stream protocol as documented in libmpv/stream_cb.h:
+ https://github.com/mpv-player/mpv/blob/master/libmpv/stream_cb.h
+
+ proto is the protocol scheme, e.g. "foo" for "foo://" urls.
+
+ This function can either be used with two parameters or it can be used as a decorator on the target
+ function.
+
+ open_fn is a function taking an URI string and returning an mpv stream object.
+ open_fn may raise a ValueError to signal libmpv the URI could not be opened.
+
+ The mpv stream protocol is as follows:
+ class Stream:
+ @property
+ def size(self):
+ return None # unknown size
+ return size # int with size in bytes
+
+ def read(self, size):
+ ...
+ return read # non-empty bytes object with input
+ return b'' # empty byte object signals permanent EOF
+
+ def seek(self, pos):
+ return new_offset # integer with new byte offset. The new offset may be before the requested offset
+ in case an exact seek is inconvenient.
+
+ def close(self):
+ ...
+
+ # def cancel(self): (future API versions only)
+ # Abort a running read() or seek() operation
+ # ...
+
+ """
+
+ def decorator(open_fn):
+ @StreamOpenFn
+ def open_backend(_userdata, uri, cb_info):
+ try:
+ frontend = open_fn(uri.decode('utf-8'))
+ except ValueError:
+ return ErrorCode.LOADING_FAILED
+
+ def read_backend(_userdata, buf, bufsize):
+ data = frontend.read(bufsize)
+ for i in range(len(data)):
+ buf[i] = data[i]
+ return len(data)
+
+ cb_info.contents.cookie = None
+ read = cb_info.contents.read = StreamReadFn(read_backend)
+ close = cb_info.contents.close = StreamCloseFn(lambda _userdata: frontend.close())
+
+ seek, size, cancel = None, None, None
+ if hasattr(frontend, 'seek'):
+ seek = cb_info.contents.seek = StreamSeekFn(lambda _userdata, offx: frontend.seek(offx))
+ if hasattr(frontend, 'size') and frontend.size is not None:
+ size = cb_info.contents.size = StreamSizeFn(lambda _userdata: frontend.size)
+
+ # Future API versions only
+ # if hasattr(frontend, 'cancel'):
+ # cb_info.contents.cancel = StreamCancelFn(lambda _userdata: frontend.cancel())
+
+ # keep frontend and callbacks in memory forever (TODO)
+ frontend._registered_callbacks = [read, close, seek, size, cancel]
+ self._stream_protocol_frontends[proto][uri] = frontend
+ return 0
+
+ if proto in self._stream_protocol_cbs:
+ raise KeyError('Stream protocol already registered')
+ self._stream_protocol_cbs[proto] = [open_backend]
+ _mpv_stream_cb_add_ro(self.handle, proto.encode('utf-8'), c_void_p(), open_backend)
+
+ return open_fn
+
+ if open_fn is not None:
+ decorator(open_fn)
+ return decorator
+
# Convenience functions
def play(self, filename):
"""Play a path or URL (requires ``ytdl`` option to be set)."""
@@ -1043,6 +1209,97 @@ class MPV(object):
``MPV.loadfile(filename, 'append-play')``."""
self.loadfile(filename, 'append', **options)
+ # "Python stream" logic. This is some porcelain for directly playing data from python generators.
+
+ def _python_stream_open(self, uri):
+ """Internal handler for python:// protocol streams registered through @python_stream(...) and
+ @python_stream_catchall
+ """
+ name, = re.fullmatch('python://(.*)', uri).groups()
+
+ if name in self._python_streams:
+ generator_fun, size = self._python_streams[name]
+ else:
+ if self._python_stream_catchall is not None:
+ generator_fun, size = self._python_stream_catchall(name)
+ else:
+ raise ValueError('Python stream name not found and no catch-all defined')
+
+ return GeneratorStream(generator_fun, size)
+
+ def python_stream(self, name=None, size=None):
+ """Register a generator for the python stream with the given name.
+
+ name is the name, i.e. the part after the "python://" in the URI, that this generator is registered as.
+ size is the total number of bytes in the stream (if known).
+
+ Any given name can only be registered once. The catch-all can also only be registered once. To unregister a
+ stream, call the .unregister function set on the callback.
+
+ The generator signals EOF by returning, manually raising StopIteration or by yielding b'', an empty bytes
+ object.
+
+ The generator may be called multiple times if libmpv seeks or loops.
+
+ See also: @mpv.python_stream_catchall
+
+ @mpv.python_stream('foobar')
+ def reader():
+ for chunk in chunks:
+ yield chunk
+ mpv.play('python://foobar')
+ mpv.wait_for_playback()
+ reader.unregister()
+ """
+ def register(cb):
+ if name in self._python_streams:
+ raise KeyError(f'Python stream name "{name}" is already registered')
+ self._python_streams[name] = (cb, size)
+ def unregister():
+ if name not in self._python_streams or\
+ self._python_streams[name][0] is not cb: # This is just a basic sanity check
+ raise RuntimeError('Python stream has already been unregistered')
+ del self._python_streams[name]
+ cb.unregister = unregister
+ return cb
+ return register
+
+ def python_stream_catchall(self, cb):
+ """ Register a catch-all python stream to be called when no name matches can be found. Use this decorator on a
+ function that takes a name argument and returns a (generator, size) tuple (with size being None if unknown).
+
+ An invalid URI can be signalled to libmpv by raising a ValueError inside the callback.
+
+ See also: @mpv.python_stream(name, size)
+
+ @mpv.python_stream_catchall
+ def catchall(name):
+ if not name.startswith('foo'):
+ raise ValueError('Unknown Name')
+
+ def foo_reader():
+ with open(name, 'rb') as f:
+ while True:
+ chunk = f.read(1024)
+ if not chunk:
+ break
+ yield chunk
+ return foo_reader, None
+ mpv.play('python://foo23')
+ mpv.wait_for_playback()
+ catchall.unregister()
+ """
+ if self._python_stream_catchall is not None:
+ raise KeyError('A catch-all python stream is already registered')
+
+ self._python_stream_catchall = cb
+ def unregister():
+ if self._python_stream_catchall is not cb:
+ raise RuntimeError('This catch-all python stream has already been unregistered')
+ self._python_stream_catchall = None
+ cb.unregister = unregister
+ return cb
+
# Property accessors
def _get_property(self, name, decoder=strict_decoder, fmt=MpvFormat.NODE):
out = create_string_buffer(sizeof(MpvNode))