[Pkg-mpd-commits] [python-mpd] 62/91: asyncio: Automatically idle in command loop

Simon McVittie smcv at debian.org
Sat Feb 24 14:55:37 UTC 2018


This is an automated email from the git hooks/post-receive script.

smcv pushed a commit to branch debian/master
in repository python-mpd.

commit 4eee007576e243345b2fcfb3172a6d9f0516a595
Author: chrysn <chrysn at fsfe.org>
Date:   Fri Apr 21 13:34:07 2017 +0200

    asyncio: Automatically idle in command loop
    
    The idle implementation included here is not what the idle interface
    should finally look like, but is sufficient for testing with a single
    task.
---
 examples/asyncio_example.py |   2 +-
 mpd/asyncio.py              | 123 +++++++++++++++++++++++++++++++++-----------
 2 files changed, 93 insertions(+), 32 deletions(-)

diff --git a/examples/asyncio_example.py b/examples/asyncio_example.py
index 194d3cd..4ea8fac 100644
--- a/examples/asyncio_example.py
+++ b/examples/asyncio_example.py
@@ -47,7 +47,7 @@ async def main():
     except Exception as e:
         print("An erroneous asynchronously looped command, as expected, raised:", e)
 
-    print("Idle result", list(await client.idle()))
+    print("Idle result", await client.idle().get())
 
 if __name__ == '__main__':
     asyncio.get_event_loop().run_until_complete(main())
diff --git a/mpd/asyncio.py b/mpd/asyncio.py
index b18b807..98c5843 100644
--- a/mpd/asyncio.py
+++ b/mpd/asyncio.py
@@ -97,6 +97,15 @@ class CommandResultIterable(BaseCommandResult):
 class MPDClient(MPDClientBase):
     __run_task = None # doubles as indicator for being connected
 
+    #: When in idle, this is a Future on which incoming commands should set a
+    #: result. (This works around asyncio.Queue not having a .peek() coroutine)
+    __command_enqueued = None
+
+    #: Seconds after a command's completion to send idle. Setting this too high
+    # causes "blind spots" in the client's view of the server, setting it too
+    # low sends needless idle/noidle after commands in quick succession.
+    IMMEDIATE_COMMAND_TIMEOUT = 0.1
+
     async def connect(self, host, port=6600, loop=None):
         self.__loop = loop
 
@@ -107,6 +116,8 @@ class MPDClient(MPDClientBase):
         self.__rfile, self.__wfile = r, w
 
         self.__commandqueue = asyncio.Queue(loop=loop)
+        self.__idle_results = asyncio.Queue(loop=loop) #: a queue of CommandResult("idle") futures
+        self.__idle_events = asyncio.Queue(loop=loop) #: a temporary dispatch mechanism, fed with subsystem strings
 
         try:
             helloline = await asyncio.wait_for(self.__readline(), timeout=5)
@@ -117,44 +128,84 @@ class MPDClient(MPDClientBase):
         SyncMPDClient._hello(self, helloline)
 
         self.__run_task = asyncio.Task(self.__run())
+        self.__idle_task = asyncio.Task(self.__distribute_idle_results())
 
     def disconnect(self):
         if self.__run_task is not None: # is None eg. when connection fails in .connect()
             self.__run_task.cancel()
+        if self.__idle_task is not None:
+            self.__idle_task.cancel()
         self.__rfile = self.__wfile = None
-        self.__run_task = self.__commandqueue = None
+        self.__run_task = self.__idle_task = None
+        self.__commandqueue = self.__command_enqueued = None
+        self.__idle_results = self.__idle_events = None
 
     async def __run(self):
-        # if this actually raises (showing as "Task exception was never
-        # retrieved"), this is indicative of an implementation error in
-        # mpd.asyncio; no network behavior should be able to trigger uncaught
-        # exceptions here.
-        while True:
-            result = await self.__commandqueue.get()
-            try:
+        result = None
+
+        try:
+            while True:
+                try:
+                    result = await asyncio.wait_for(
+                            self.__commandqueue.get(),
+                            timeout=self.IMMEDIATE_COMMAND_TIMEOUT,
+                            loop=self.__loop,
+                            )
+                except asyncio.TimeoutError:
+                    # the cancellation of the __commandqueue.get() that happens
+                    # in this case is intended, and is just what asyncio.Queue
+                    # suggests for "get with timeout".
+
+                    result = CommandResult("idle", [], self._parse_list)
+                    self.__idle_results.put_nowait(result)
+
+                    self.__command_enqueued = asyncio.Future()
+
                 self._write_command(result._command, result._args)
-            except Exception as e:
+                while True:
+                    try:
+                        if self.__command_enqueued is not None:
+                            # we're in idle mode
+                            line_future = asyncio.shield(self.__read_output_line())
+                            await asyncio.wait([line_future, self.__command_enqueued],
+                                    return_when=asyncio.FIRST_COMPLETED)
+                            if self.__command_enqueued.done():
+                                self._write_command("noidle")
+                                self.__command_enqueued = None
+                            l = await line_future
+                        else:
+                            l = await self.__read_output_line()
+                    except CommandError as e:
+                        result._feed_error(e)
+                        break
+                    result._feed_line(l)
+                    if l is None:
+                        break
+
+                result = None
+
+        except Exception as e:
+            # prevent the destruction of the pending task in the shutdown
+            # function -- it's just shutting down by itself
+            self.__run_task = None
+            self.disconnect()
+
+            if result is not None:
                 result._feed_error(e)
-                # prevent the destruction of the pending task in the shutdown
-                # function -- it's just shutting down by itself
-                self.__run_task = None
-                self.disconnect()
                 return
-            while True:
-                try:
-                    l = await self.__read_output_line()
-                except CommandError as e:
-                    result._feed_error(e)
-                    break
-                except Exception as e:
-                    # see above
-                    result._feed_error(e)
-                    self.__run_task = None
-                    self.disconnect()
-                    return
-                result._feed_line(l)
-                if l is None:
-                    break
+            else:
+                # typically this is a bug in mpd.asyncio
+                raise
+
+    async def __distribute_idle_results(self):
+        # an exception flying out of here probably means a connection
+        # interruption during idle. this will just show like any other
+        # unhandled task exception and that's probably the best we can do
+        while True:
+            result = await self.__idle_results.get()
+            idle_changes = await result
+            for change in idle_changes:
+                self.__idle_events.put_nowait(change)
 
     # helper methods
 
@@ -268,15 +319,25 @@ class MPDClient(MPDClientBase):
     def add_command(cls, name, callback):
         command_class = CommandResultIterable if callback.mpd_commands_direct else CommandResult
         if hasattr(cls, name):
-            # twisted silently ignores them; probably, i'll make an
-            # experience that'll make me take the same router at some point.
-            raise AttributeError("Refusing to override the %s command"%name)
+            # idle and noidle are explicitly implemented, skipping them
+            return
         def f(self, *args):
             result = command_class(name, args, partial(callback, self))
             if self.__run_task is None:
                 raise ConnectionError("Can not send command to disconnected client")
             self.__commandqueue.put_nowait(result)
+            if self.__command_enqueued is not None and not self.__command_enqueued.done():
+                self.__command_enqueued.set_result(None)
             return result
         escaped_name = name.replace(" ", "_")
         f.__name__ = escaped_name
         setattr(cls, escaped_name, f)
+
+    # commands that just work differently
+
+    def idle(self, subsystems=[]):
+        # FIXME this is not the final interface
+        return self.__idle_events
+
+    def noidle(self):
+        raise AttributeError("noidle is not supported / required in mpd.asyncio")

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-mpd/python-mpd.git



More information about the Pkg-mpd-commits mailing list