[Pkg-mpd-commits] [python-mpd] 52/91: asyncio: Allow streaming of the `return _parse_objects` commands

Simon McVittie smcv at debian.org
Sat Feb 24 14:55:35 UTC 2018


This is an automated email from the git hooks/post-receive script.

smcv pushed a commit to branch debian/master
in repository python-mpd.

commit fe93e2a349795a442b68308094da0da761a08c4c
Author: chrysn <chrysn at fsfe.org>
Date:   Fri Apr 7 09:19:07 2017 +0200

    asyncio: Allow streaming of the `return _parse_objects` commands
---
 mpd/asyncio.py | 130 +++++++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 107 insertions(+), 23 deletions(-)

diff --git a/mpd/asyncio.py b/mpd/asyncio.py
index 4eb42cf..cd35379 100644
--- a/mpd/asyncio.py
+++ b/mpd/asyncio.py
@@ -5,9 +5,9 @@ from mpd.base import HELLO_PREFIX, ERROR_PREFIX, SUCCESS
 from mpd.base import MPDClientBase
 from mpd.base import MPDClient as SyncMPDClient
 from mpd.base import ProtocolError, ConnectionError, CommandError
-from mpd.base import mpd_command_provider
+from mpd.base import mpd_command_provider, mpd_commands
 
-class CommandResult(asyncio.Future):
+class BaseCommandResult(asyncio.Future):
     """A future that carries its command/args/callback with it for the
     convenience of passing it around to the command queue."""
 
@@ -15,18 +15,60 @@ class CommandResult(asyncio.Future):
         super().__init__()
         self._command = command
         self._args = args
-        self.__callback = callback
+        self._callback = callback
+
+class CommandResult(BaseCommandResult):
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
         self.__spooled_lines = []
 
     def _feed_line(self, line):
         """Put the given line into the callback machinery, and set the result on a None line."""
         if line is None:
-            self.set_result(self.__callback(self.__spooled_lines))
+            self.set_result(self._callback(self.__spooled_lines))
         else:
             self.__spooled_lines.append(line)
 
-    async def __aiter__(self):
-        raise NotImplementedError("async for x in clientcommand() is work in progress")
+    def _feed_error(self, error):
+        self.set_exception(error)
+
+class CommandResultIterable(BaseCommandResult):
+    """Variant of CommandResult where the underlying callback is an
+    asynchronous` generator, and can thus interpret lines as they come along.
+
+    The result can be used with the aiter interface (`async for`). If it is
+    still used as a future instead, it eventually results in a list.
+
+    Commands used with this CommandResult must use their passed lines not like
+    an iterable (as in the synchronous implementation), but as a asyncio.Queue.
+    Furthermore, they must check whether the queue elements are exceptions, and
+    raise them.
+    """
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.__spooled_lines = asyncio.Queue()
+
+    def _feed_line(self, line):
+        self.__spooled_lines.put_nowait(line)
+
+    _feed_error = _feed_line
+
+    def __await__(self):
+        asyncio.Task(self.__feed_future())
+        return super().__await__()
+
+    async def __feed_future(self):
+        result = []
+        async for r in self:
+            result.append(r)
+        self.set_result(result)
+
+    def __aiter__(self):
+        if self.done():
+            raise RuntimeError("Command result is already being consumed")
+        return self._callback(self.__spooled_lines).__aiter__()
+
 
 @mpd_command_provider
 class MPDClient(MPDClientBase):
@@ -56,15 +98,15 @@ class MPDClient(MPDClientBase):
         while True:
             result = await self.__commandqueue.get()
             self._write_command(result._command, result._args)
-            try:
-                responselines = await self.__read_full_output()
-                for l in responselines:
-                    result._feed_line(l)
-            except Exception as e:
-                # may be CommandError flying out of __read_fulloutput or any kind of exception from the callback
-                result.set_exception(e)
-            finally:
-                assert result.done(), "Result %s not done after __read_full_output was processed"%(result,)
+            while True:
+                try:
+                    l = await self.__read_output_line()
+                except CommandError as e:
+                    result._feed_error(e)
+                    break
+                result._feed_line(l)
+                if l is None:
+                    break
 
     # helper methods
 
@@ -81,7 +123,7 @@ class MPDClient(MPDClientBase):
         """Wrapper around .__wfile.write that handles encoding."""
         self.__wfile.write(text.encode('utf8'))
 
-    # copied stuff from base
+    # copied and subtly modifiedstuff from base
 
     async def __hello(self):
         # not catching the timeout error, it's actually pretty adaequate
@@ -121,25 +163,67 @@ class MPDClient(MPDClientBase):
             return None
         return line
 
-    async def __read_full_output(self):
-        """Kind of like SyncMPDClient._read_lines, but without the iteration"""
-        result = []
+    async def _parse_objects(self, lines, delimiters=[]):
+        """Like _parse_objects, but waits for lines"""
+        obj = {}
         while True:
-            line = await self.__read_output_line()
-            result.append(line)
+            line = await lines.get()
+            if isinstance(line, BaseException):
+                raise line
             if line is None:
-                return result
+                break
+            key, value = self._parse_pair(line, separator=": ")
+            key = key.lower()
+            if obj:
+                if key in delimiters:
+                    yield obj
+                    obj = {}
+                elif key in obj:
+                    if not isinstance(obj[key], list):
+                        obj[key] = [obj[key], value]
+                    else:
+                        obj[key].append(value)
+                    continue
+            obj[key] = value
+        if obj:
+            yield obj
+
+    # as the above works for everyone who calls `return _parse_objects` but
+    # *not* for those that return list(_parse_objects(...))[0], that single
+    # function is rewritten here to use the original _parse_objects
+
+    @mpd_commands('count', 'currentsong', 'readcomments', 'stats', 'status')
+    def _parse_object(self, lines):
+        objs = list(SyncMPDClient._parse_objects(self, lines))
+        if not objs:
+            return {}
+        return objs[0]
 
     # command provider interface
 
+    __wrap_async_iterator_parsers = [
+            # the very ones that return _parse_object directly
+            SyncMPDClient._parse_changes,
+            SyncMPDClient._parse_database,
+            SyncMPDClient._parse_messages,
+            SyncMPDClient._parse_mounts,
+            SyncMPDClient._parse_neighbors,
+            SyncMPDClient._parse_outputs,
+            SyncMPDClient._parse_playlists,
+            SyncMPDClient._parse_plugins,
+            SyncMPDClient._parse_songs,
+            ]
+
     @classmethod
     def add_command(cls, name, callback):
+        wrap_result = callback in cls.__wrap_async_iterator_parsers
+        command_class = CommandResultIterable if wrap_result else CommandResult
         if hasattr(cls, name):
             # twisted silently ignores them; probably, i'll make an
             # experience that'll make me take the same router at some point.
             raise AttributeError("Refusing to override the %s command"%name)
         def f(self, *args):
-            result = CommandResult(name, args, partial(callback, self))
+            result = command_class(name, args, partial(callback, self))
             self.__commandqueue.put_nowait(result)
             return result
         escaped_name = name.replace(" ", "_")

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-mpd/python-mpd.git



More information about the Pkg-mpd-commits mailing list