[Pkg-mpd-commits] [python-mpd] 52/91: asyncio: Allow streaming of the `return _parse_objects` commands
Simon McVittie
smcv at debian.org
Sat Feb 24 14:55:35 UTC 2018
This is an automated email from the git hooks/post-receive script.
smcv pushed a commit to branch debian/master
in repository python-mpd.
commit fe93e2a349795a442b68308094da0da761a08c4c
Author: chrysn <chrysn at fsfe.org>
Date: Fri Apr 7 09:19:07 2017 +0200
asyncio: Allow streaming of the `return _parse_objects` commands
---
mpd/asyncio.py | 130 +++++++++++++++++++++++++++++++++++++++++++++++----------
1 file changed, 107 insertions(+), 23 deletions(-)
diff --git a/mpd/asyncio.py b/mpd/asyncio.py
index 4eb42cf..cd35379 100644
--- a/mpd/asyncio.py
+++ b/mpd/asyncio.py
@@ -5,9 +5,9 @@ from mpd.base import HELLO_PREFIX, ERROR_PREFIX, SUCCESS
from mpd.base import MPDClientBase
from mpd.base import MPDClient as SyncMPDClient
from mpd.base import ProtocolError, ConnectionError, CommandError
-from mpd.base import mpd_command_provider
+from mpd.base import mpd_command_provider, mpd_commands
-class CommandResult(asyncio.Future):
+class BaseCommandResult(asyncio.Future):
"""A future that carries its command/args/callback with it for the
convenience of passing it around to the command queue."""
@@ -15,18 +15,60 @@ class CommandResult(asyncio.Future):
super().__init__()
self._command = command
self._args = args
- self.__callback = callback
+ self._callback = callback
+
+class CommandResult(BaseCommandResult):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
self.__spooled_lines = []
def _feed_line(self, line):
"""Put the given line into the callback machinery, and set the result on a None line."""
if line is None:
- self.set_result(self.__callback(self.__spooled_lines))
+ self.set_result(self._callback(self.__spooled_lines))
else:
self.__spooled_lines.append(line)
- async def __aiter__(self):
- raise NotImplementedError("async for x in clientcommand() is work in progress")
+ def _feed_error(self, error):
+ self.set_exception(error)
+
+class CommandResultIterable(BaseCommandResult):
+ """Variant of CommandResult where the underlying callback is an
+ asynchronous` generator, and can thus interpret lines as they come along.
+
+ The result can be used with the aiter interface (`async for`). If it is
+ still used as a future instead, it eventually results in a list.
+
+ Commands used with this CommandResult must use their passed lines not like
+ an iterable (as in the synchronous implementation), but as a asyncio.Queue.
+ Furthermore, they must check whether the queue elements are exceptions, and
+ raise them.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.__spooled_lines = asyncio.Queue()
+
+ def _feed_line(self, line):
+ self.__spooled_lines.put_nowait(line)
+
+ _feed_error = _feed_line
+
+ def __await__(self):
+ asyncio.Task(self.__feed_future())
+ return super().__await__()
+
+ async def __feed_future(self):
+ result = []
+ async for r in self:
+ result.append(r)
+ self.set_result(result)
+
+ def __aiter__(self):
+ if self.done():
+ raise RuntimeError("Command result is already being consumed")
+ return self._callback(self.__spooled_lines).__aiter__()
+
@mpd_command_provider
class MPDClient(MPDClientBase):
@@ -56,15 +98,15 @@ class MPDClient(MPDClientBase):
while True:
result = await self.__commandqueue.get()
self._write_command(result._command, result._args)
- try:
- responselines = await self.__read_full_output()
- for l in responselines:
- result._feed_line(l)
- except Exception as e:
- # may be CommandError flying out of __read_fulloutput or any kind of exception from the callback
- result.set_exception(e)
- finally:
- assert result.done(), "Result %s not done after __read_full_output was processed"%(result,)
+ while True:
+ try:
+ l = await self.__read_output_line()
+ except CommandError as e:
+ result._feed_error(e)
+ break
+ result._feed_line(l)
+ if l is None:
+ break
# helper methods
@@ -81,7 +123,7 @@ class MPDClient(MPDClientBase):
"""Wrapper around .__wfile.write that handles encoding."""
self.__wfile.write(text.encode('utf8'))
- # copied stuff from base
+ # copied and subtly modifiedstuff from base
async def __hello(self):
# not catching the timeout error, it's actually pretty adaequate
@@ -121,25 +163,67 @@ class MPDClient(MPDClientBase):
return None
return line
- async def __read_full_output(self):
- """Kind of like SyncMPDClient._read_lines, but without the iteration"""
- result = []
+ async def _parse_objects(self, lines, delimiters=[]):
+ """Like _parse_objects, but waits for lines"""
+ obj = {}
while True:
- line = await self.__read_output_line()
- result.append(line)
+ line = await lines.get()
+ if isinstance(line, BaseException):
+ raise line
if line is None:
- return result
+ break
+ key, value = self._parse_pair(line, separator=": ")
+ key = key.lower()
+ if obj:
+ if key in delimiters:
+ yield obj
+ obj = {}
+ elif key in obj:
+ if not isinstance(obj[key], list):
+ obj[key] = [obj[key], value]
+ else:
+ obj[key].append(value)
+ continue
+ obj[key] = value
+ if obj:
+ yield obj
+
+ # as the above works for everyone who calls `return _parse_objects` but
+ # *not* for those that return list(_parse_objects(...))[0], that single
+ # function is rewritten here to use the original _parse_objects
+
+ @mpd_commands('count', 'currentsong', 'readcomments', 'stats', 'status')
+ def _parse_object(self, lines):
+ objs = list(SyncMPDClient._parse_objects(self, lines))
+ if not objs:
+ return {}
+ return objs[0]
# command provider interface
+ __wrap_async_iterator_parsers = [
+ # the very ones that return _parse_object directly
+ SyncMPDClient._parse_changes,
+ SyncMPDClient._parse_database,
+ SyncMPDClient._parse_messages,
+ SyncMPDClient._parse_mounts,
+ SyncMPDClient._parse_neighbors,
+ SyncMPDClient._parse_outputs,
+ SyncMPDClient._parse_playlists,
+ SyncMPDClient._parse_plugins,
+ SyncMPDClient._parse_songs,
+ ]
+
@classmethod
def add_command(cls, name, callback):
+ wrap_result = callback in cls.__wrap_async_iterator_parsers
+ command_class = CommandResultIterable if wrap_result else CommandResult
if hasattr(cls, name):
# twisted silently ignores them; probably, i'll make an
# experience that'll make me take the same router at some point.
raise AttributeError("Refusing to override the %s command"%name)
def f(self, *args):
- result = CommandResult(name, args, partial(callback, self))
+ result = command_class(name, args, partial(callback, self))
self.__commandqueue.put_nowait(result)
return result
escaped_name = name.replace(" ", "_")
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-mpd/python-mpd.git
More information about the Pkg-mpd-commits
mailing list