Python asyncio and await'ing multiple functions
One of the nicer features that is part of Python 3 is the new async
/ await
functionality. This is now a feature of the Python interpreter and can for the most part replace older packages like Twisted or Eventlet. Some people may prefer the programming style of frameworks like Twisted, but I do not. The asyncio
module gives you all the tools needed to concurrently run a bunch of tasks without really needing to worry too much about how any of this is impelemented. The biggest change is the need to use the async
and await
keywords in your Python code. As long as your tasks are somehow IO-bound (like a chat server) things work pretty well. All actual execution still takes place on a single operating system thread. Python is practically single-threaded anyways due to the GIL and it is unlikely this will change in the near future.
If you're looking for a basic introduction of how await
/ async
is used, I would start with this guide.
Golang's select
The addition of async
/await
in Python really brings it up to par with other languages when you're trying to do a bunch of networking type stuff. I've built a few experimental projects now like a server that is able to stream an event feed to a browser using WebSockets. This was done by using the websockets package. It was a straightforward project and I did not encounter any steep learning curve. Back when Golang was just reaching version 1.0 I was a frequent user of Go at my job. One of the cooler features of go is the select
statement. The select
statement lets you wait on multiple statements until one of them has a result ready.
Here is an example, used under license from Mark McGranaghan from here
package main import ( "fmt" "time" ) func main() { c1 := make(chan string) c2 := make(chan string) go func() { time.Sleep(1 * time.Second) c1 <- "one" }() go func() { time.Sleep(2 * time.Second) c2 <- "two" }() for i := 0; i < 2; i++ { select { case msg1 := <-c1: fmt.Println("received", msg1) case msg2 := <-c2: fmt.Println("received", msg2) } } }
For those not familiar with Go, I'll briefly list out what this does
- It starts a program with two channels,
c1
andc2
. Channels in Go are async. event queues - It starts two goroutines, each of which will put a message into one of the channels after a certain delay. These run concurrently to the main thread.
- The main thread waits on something to become available in the channels created in step 1, by using
select
.
The use of the select
statement is really powerful, because until something is available in one of the channels the main thread is just idle and does nothing. As soon as any of the statements listed in select
are ready then the main thread gets to process the result.
How to await
multiple functions in Python
For projects I work on, it is common for me to want a single task to be able to wait for events from any number of sources. I prefer this programming pattern because it lets you bring in different types of data and do something with it in a serial fashion. This might seem kind of backwards, since it means a concurrent system has to funnel lots of events all through one codepath. But there are practical applications of this. Consider if you had a database table named user_last_active
with columns user_id
and last_active_time
. You could use this as part of a website to track the last time the user was active on the website. The obvious answer would be to update the corresponding row for each user every time they viewed a page. This would basically mean every page view would generate a database update. This would not scale very well, since your database would have ever more updates being applied per second as traffic increased.
If you step back and consider you probably don't need to track each users activity down to the millisecond but instead down to the minute level, you can just update the table once a minute for all users that have been active in the past minute. To implement this with some sort of event driven system you really just need two sources of events.
- A source of events for user activity
- An event source that emits a special trigger event 60 seconds
When you receive events from source 1, you can just track the last active time for each user in a Python dict
in memory. When the second event source event emits the trigger event you process the dict
and update all the appropriate rows.
The equivalent class to Golang's channels in Python is asyncio.Queue
. It's an event queue which is optionally bounded in the number of events it holds. When putting things in the queue you only have to await q.put(item)
to block until the item is placed in the queue. If space is already available this returns immediately. To get items from the queue you just do item = await q.get()
. This call blocks until an item can be taken from the queue.
Triggering once a minute might sound complex, but it isn't. To get an event source with a trigger event, every 60 seconds you really only need to wrap the builtin asyncio.sleep
function.
async def after_one_minute(): await asyncio.sleep(60.0) return 'trigger'
If you call after_one_minute()
the result is not available for at least 60 seconds.
Where Python comes up short is if you want to wait on multiple queues at the same time. There is no direct equivalent to the select
statement from Golang. The closest available things in Python are asyncio.as_completed
and asyncio.wait
. The documentation for those functions is available here.
Using asyncio.as_completed
gives you back the results of coroutines you would normally await
one by one, in whatever order they have to complete. Obviously, there is no control to the ordering of the results. So this isn't really helpful.
The asyncio.wait
function by default waits for all awaitables to complete. So you don't get results as they become available. You can change this by passing a parameter of return_when
set to asyncio.FIRST_COMPLETED
to get the first result as soon as it is available.
Neither of these functions are directly comparable to using a Golang select
statement inside a for
loop either. The select
statement allows you to keep getting items from a channel over and over provided it still has items in it. Trying to implement this in Python code winds up with something like this
q1 = asyncio.Queue() # pretend something puts stuff in this q1 = asyncio.Queue() # pretend something puts stuff in this done = False results = [] while not done: # This really doesn't work, don't try it done, pending = await asyncio.wait([q1.get(), q2.get()], return_when = asyncio.FIRST_COMPLETED) for d in done: results.append(d.result()) done = len(results) >= 100 # run until we get some results
It turns out the above code has a multiplicity of problems
- The
done
andpending
values areset
type, and it is non-trivial to figure out where the result came from - The
done
andpending
contains objects of typeasyncio.Task
, even though coroutines were passed - Each iteration of the loop calls both
q1.get()
andq2.get()
creating new coroutines, even though the previous iterated only one of them actually completed.
So obviously, you can't just drop in asyncio.wait
and have it do what you want. Problems #1 and #2 can be fixed by wrapping all your coroutines as tasks by calling asyncio.create_task(coro)
, keeping track of those tasks and then cmoparing them against what is returned in done
and pending
.
To avoid problem #3 you also have to track what coroutine actually was completed and remember to "start" it again by just calling the same function again. All of this can be combined into a very ugly piece of code like this
q1 = asyncio.Queue() # pretend something puts stuff in this q2 = asyncio.Queue() # pretend something puts stuff in this done = False # List of things to get data from, list of methods, not actually calling them yet sources = [q1.get, q2.get] tasks = [None, None] results_by_position = [[], []] while not done: for i in range(2): # For any blank in the coroutines, call the source to fill it in if tasks[i] is None: tasks[i] = asyncio.create_task(sources[i]()) # wait on everything done, pending = await asyncio.wait(tasks, return_when = asyncio.FIRST_COMPLETED) # Go through each task, picking out the ones that are actually done for i, task in enumerate(tasks): if task in done: tasks[i] = None # blank it out since it is done results[i].append(task.result()) # put the result in the buffer done = sum( len(x) for x in results ) >= 100 # run until we get some results # so we're done # but wait, some stuff is still running in tasks most likely and needs to be cancelled
This mostly solves the problems listed above, although you now have the issue of needing to call .cancel()
on any leftovers in the tasks
list. So as you can probably tell this is a huge amount of boilerplate for what is a conceptually simple problem. As you can probably guess I am not going to go about duplicating this all over my codebase. So I wrote a Python package called multi-await
to implement all of this in a fairly generic way. It also does a other practical things like returning the results positionally in a list
instead of in a Python set
.
q1 = asyncio.Queue() # pretend something puts stuff in this q1 = asyncio.Queue() # pretend something puts stuff in this from multi_await import multi_await async with multi_await() as m: m.add(q1.get) m.add(q2.get) results = [[],[]] done = False while not done: complete, failures = await m.get() for i, result in enumerate(completed): if result is not None: results[i].append(result) for f in failures: if f is not None: # One of the coros returned an error print("Oops, this failed:" + f) done = sum( len(x) for x in results ) >= 100 # run until we get some results
This package adds a simple function called multi_await()
that is usable as a context manager. For each function you want to run, just call m.add(fn)
before you enter a loop. The context manager takes care of calling them when needed. You can use this with anything that returns a coroutine or an asyncio.Task
object. Once setup is complete just do await m.get()
to get the results positionally. This returns two lists, complete
and failures
. Since Python has exceptions, you still have the possibility of a coroutine raising an exception. This is why the failures
list is also returned. Both lists have a length equal to the number of calls to theadd
function.
So in the normal case, you'd have the following two arrays from the above code
completed = ['some_item', None] # One result is filled in failures = [None, None] # No errors
Since lists are returned, you can unpack them like this. It makes it obvious where an individual result comes from.
my_results_from_queue1, my_results_from_queue_2 = completed if my_results_from_queue1 is not None: pass # do something with the value if my_results_from_queue2 is not None: pass # do something with the value
It is of course possible that every single value in completed
is present if all the coroutines managed to complete. In the case of couroutines raising exceptions you'll have the exception values in the corresponding positions in the failures
list. These are returned as values because I figure it is easier to work with than trying to add a try
/catch
into your code.
If you can't use the context manager version for some reason, the multi_await.MultiAwait
class can be instantiated. You need to manually call the .cancel()
method on that object to cancel any pending tasks when you are done with it.
Links
You can find my package on PyPi (so you can install it with pip) or it is available on GitHub.