Python asyncio and await'ing multiple functions

One of the nicer features that is part of Python 3 is the new async / await functionality. This is now a feature of the Python interpreter and can for the most part replace older packages like Twisted or Eventlet. Some people may prefer the programming style of frameworks like Twisted, but I do not. The asyncio module gives you all the tools needed to concurrently run a bunch of tasks without really needing to worry too much about how any of this is impelemented. The biggest change is the need to use the async and await keywords in your Python code. As long as your tasks are somehow IO-bound (like a chat server) things work pretty well. All actual execution still takes place on a single operating system thread. Python is practically single-threaded anyways due to the GIL and it is unlikely this will change in the near future.

If you're looking for a basic introduction of how await / async is used, I would start with this guide.

Golang's select

The addition of async/await in Python really brings it up to par with other languages when you're trying to do a bunch of networking type stuff. I've built a few experimental projects now like a server that is able to stream an event feed to a browser using WebSockets. This was done by using the websockets package. It was a straightforward project and I did not encounter any steep learning curve. Back when Golang was just reaching version 1.0 I was a frequent user of Go at my job. One of the cooler features of go is the select statement. The select statement lets you wait on multiple statements until one of them has a result ready.

Here is an example, used under license from Mark McGranaghan from here

package main

import (
    "fmt"
    "time"
)

func main() {

    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}

For those not familiar with Go, I'll briefly list out what this does

  1. It starts a program with two channels, c1 and c2. Channels in Go are async. event queues
  2. It starts two goroutines, each of which will put a message into one of the channels after a certain delay. These run concurrently to the main thread.
  3. The main thread waits on something to become available in the channels created in step 1, by using select.

The use of the select statement is really powerful, because until something is available in one of the channels the main thread is just idle and does nothing. As soon as any of the statements listed in select are ready then the main thread gets to process the result.

How to await multiple functions in Python

For projects I work on, it is common for me to want a single task to be able to wait for events from any number of sources. I prefer this programming pattern because it lets you bring in different types of data and do something with it in a serial fashion. This might seem kind of backwards, since it means a concurrent system has to funnel lots of events all through one codepath. But there are practical applications of this. Consider if you had a database table named user_last_active with columns user_id and last_active_time. You could use this as part of a website to track the last time the user was active on the website. The obvious answer would be to update the corresponding row for each user every time they viewed a page. This would basically mean every page view would generate a database update. This would not scale very well, since your database would have ever more updates being applied per second as traffic increased.

If you step back and consider you probably don't need to track each users activity down to the millisecond but instead down to the minute level, you can just update the table once a minute for all users that have been active in the past minute. To implement this with some sort of event driven system you really just need two sources of events.

  1. A source of events for user activity
  2. An event source that emits a special trigger event 60 seconds

When you receive events from source 1, you can just track the last active time for each user in a Python dict in memory. When the second event source event emits the trigger event you process the dict and update all the appropriate rows.

The equivalent class to Golang's channels in Python is asyncio.Queue. It's an event queue which is optionally bounded in the number of events it holds. When putting things in the queue you only have to await q.put(item) to block until the item is placed in the queue. If space is already available this returns immediately. To get items from the queue you just do item = await q.get(). This call blocks until an item can be taken from the queue.

Triggering once a minute might sound complex, but it isn't. To get an event source with a trigger event, every 60 seconds you really only need to wrap the builtin asyncio.sleep function.

async def after_one_minute():
  await asyncio.sleep(60.0)
  return 'trigger'

If you call after_one_minute() the result is not available for at least 60 seconds.

Where Python comes up short is if you want to wait on multiple queues at the same time. There is no direct equivalent to the select statement from Golang. The closest available things in Python are asyncio.as_completed and asyncio.wait. The documentation for those functions is available here.

Using asyncio.as_completed gives you back the results of coroutines you would normally await one by one, in whatever order they have to complete. Obviously, there is no control to the ordering of the results. So this isn't really helpful.

The asyncio.wait function by default waits for all awaitables to complete. So you don't get results as they become available. You can change this by passing a parameter of return_when set to asyncio.FIRST_COMPLETED to get the first result as soon as it is available.

Neither of these functions are directly comparable to using a Golang select statement inside a for loop either. The select statement allows you to keep getting items from a channel over and over provided it still has items in it. Trying to implement this in Python code winds up with something like this

q1 = asyncio.Queue() # pretend something puts stuff in this
q1 = asyncio.Queue() # pretend something puts stuff in this
done = False
results = []
while not done:
  # This really doesn't work, don't try it 
  done, pending = await asyncio.wait([q1.get(), q2.get()], return_when = asyncio.FIRST_COMPLETED)

  for d in done:
    results.append(d.result())
  done = len(results) >= 100 # run until we get some results

It turns out the above code has a multiplicity of problems

  1. The done and pending values are set type, and it is non-trivial to figure out where the result came from
  2. The done and pending contains objects of type asyncio.Task, even though coroutines were passed
  3. Each iteration of the loop calls both q1.get() and q2.get() creating new coroutines, even though the previous iterated only one of them actually completed.

So obviously, you can't just drop in asyncio.wait and have it do what you want. Problems #1 and #2 can be fixed by wrapping all your coroutines as tasks by calling asyncio.create_task(coro), keeping track of those tasks and then cmoparing them against what is returned in done and pending.

To avoid problem #3 you also have to track what coroutine actually was completed and remember to "start" it again by just calling the same function again. All of this can be combined into a very ugly piece of code like this

q1 = asyncio.Queue() # pretend something puts stuff in this
q2 = asyncio.Queue() # pretend something puts stuff in this

done = False
# List of things to get data from, list of methods, not actually calling them yet
sources = [q1.get, q2.get]
tasks = [None, None]
results_by_position = [[], []]
while not done:
  for i in range(2):
    # For any blank in the coroutines, call the source to fill it in
    if tasks[i] is None:
      tasks[i] =  asyncio.create_task(sources[i]())

  # wait on everything
  done, pending = await asyncio.wait(tasks, return_when = asyncio.FIRST_COMPLETED)

  # Go through each task, picking out the ones that are actually done  
  for i, task in enumerate(tasks):
    if task in done:
      tasks[i] = None # blank it out since it is done
      results[i].append(task.result()) # put the result in the buffer


  done = sum( len(x) for x in results ) >= 100 # run until we get some results

# so we're done
# but wait, some stuff is still running in tasks most likely and needs to be cancelled

This mostly solves the problems listed above, although you now have the issue of needing to call .cancel() on any leftovers in the tasks list. So as you can probably tell this is a huge amount of boilerplate for what is a conceptually simple problem. As you can probably guess I am not going to go about duplicating this all over my codebase. So I wrote a Python package called multi-await to implement all of this in a fairly generic way. It also does a other practical things like returning the results positionally in a list instead of in a Python set.

q1 = asyncio.Queue() # pretend something puts stuff in this
q1 = asyncio.Queue() # pretend something puts stuff in this

from multi_await import multi_await

async with multi_await() as m:
  m.add(q1.get)
  m.add(q2.get)

  results = [[],[]]

  done = False
  while not done:
    complete, failures = await m.get()
    for i, result in enumerate(completed):
      if result is not None:
        results[i].append(result) 

    for f in failures:
      if f is not None:
        # One of the coros returned an error
        print("Oops, this failed:" + f)
    done = sum( len(x) for x in results ) >= 100 # run until we get some results

This package adds a simple function called multi_await() that is usable as a context manager. For each function you want to run, just call m.add(fn) before you enter a loop. The context manager takes care of calling them when needed. You can use this with anything that returns a coroutine or an asyncio.Task object. Once setup is complete just do await m.get() to get the results positionally. This returns two lists, complete and failures. Since Python has exceptions, you still have the possibility of a coroutine raising an exception. This is why the failures list is also returned. Both lists have a length equal to the number of calls to theadd function.

So in the normal case, you'd have the following two arrays from the above code

completed  = ['some_item', None] # One result is filled in
failures = [None, None] # No errors

Since lists are returned, you can unpack them like this. It makes it obvious where an individual result comes from.

my_results_from_queue1, my_results_from_queue_2 = completed
if my_results_from_queue1 is not None:
  pass # do something with the value
if my_results_from_queue2 is not None:
  pass # do something with the value

It is of course possible that every single value in completed is present if all the coroutines managed to complete. In the case of couroutines raising exceptions you'll have the exception values in the corresponding positions in the failures list. These are returned as values because I figure it is easier to work with than trying to add a try/catch into your code.

If you can't use the context manager version for some reason, the multi_await.MultiAwait class can be instantiated. You need to manually call the .cancel() method on that object to cancel any pending tasks when you are done with it.

You can find my package on PyPi (so you can install it with pip) or it is available on GitHub.

Github: https://github.com/hydrogen18/multi-await

PyPi: https://pypi.org/project/multi-await/


Copyright Eric Urban 2020, or the respective entity where indicated