Python asyncio stumbling block: aborting tasks
I use Python's async
/await
features in all kinds of software. If you have multiple things you want to do with a script concurrently then it is an easy choice. As long as your task isn't CPU bound, it works well.
But one thing I have consistently ran into is managing a bunch of asyncio
tasks. By default, any async
method in Python is a coroutine. When you want to run many coroutines simultaneously and wait on the result you need to convert them to tasks using asyncio.create_task
. A list of these tasks is then passed to a function like asyncio.wait
. This leads to an embarassingly common problem and is easy to demonstrate:
import asyncio async def task_that_fails(): raise RuntimeError('task failed') async def async_main(): task0 = asyncio.create_task(asyncio.sleep(10.0)) task1 = asyncio.create_task(task_that_fails()) done, pending = await asyncio.wait([task0, task1], timeout=1.0) for t in done: await t asyncio.run(async_main())
If you think would output a mesage about RuntimeError: task failed
, it does. The first task, task0
is waiting on asyncio.sleep(10.0)
to complete. This takes at least 10 seconds. The second task task1
fails immediately by raising an exception. The call to asyncio.wait
specifies a timeout of no more than 1 second. So after one second, asyncio.wait
returns. In the first set called done
is task1
but the second set pending
has task0
in it.
The exception from task1
propagates when await t
is called inside the for
loop.
Traceback (most recent call last): File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example.py", line 13, in <module> asyncio.run(async_main()) File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete return future.result() File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example.py", line 11, in async_main await t File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example.py", line 4, in task_that_fails raise RuntimeError('task failed') RuntimeError: task failed
So what happens to task0
? In this example, nothing at all. It's still running at the time the interpreter exits. In this small example, it isn't an issue. But if you have a long running program, it means task0
would still be running and doing things. This probably isn't the intent of the code, especially since there is no easy way to reference task0
.
One common solution is to replace asyncio.wait
with asyncio.gather
like this:
import asyncio async def task_that_fails(): raise RuntimeError('task failed') async def async_main(): task0 = asyncio.create_task(asyncio.sleep(10.0)) task1 = asyncio.create_task(task_that_fails()) await asyncio.gather(task0, task1) asyncio.run(async_main())
This does basically the same thing, except asyncio.gather
returns immediately when the exception occurs in task1
. In both cases task0
is left running. Also, you can't specify a timeout with asyncio.gather
. This means that the code waits potentially forever for the tasks to complete in the case of tasks that are working, but are very slow.
So how do we handle this? Well, any task created with asyncio.create_task
has a .cancel()
function that can be used to cancel it. So, let's refactor the original example to cancel any pending tasks.
import asyncio async def task_that_fails(): raise RuntimeError('task failed') async def async_main(): task0 = asyncio.create_task(asyncio.sleep(10.0)) task1 = asyncio.create_task(task_that_fails()) done, pending = await asyncio.wait([task0, task1], timeout=1.0) for t in pending: # cancel anything running t.cancel() for t in done: await t asyncio.run(async_main(), debug=True)
By iterating over pending
, each task that is still running can be cancelled. In this case, only task0
is in pending. Great! Now consider this more interesting example.
import asyncio async def task_that_fails(): raise RuntimeError('task failed') async def cleanup_task(): i = 0 try: f = open('example.out', 'w') while True: await asyncio.sleep(1.0) i += 1 f.write('test output %d!\n' % (i,)) finally: f.close() f.write('done\n') async def async_main(): task0 = asyncio.create_task(asyncio.sleep(10.0)) task1 = asyncio.create_task(task_that_fails()) task2 = asyncio.create_task(cleanup_task()) done, pending = await asyncio.wait([task0, task1, task2], timeout=1.0) for t in pending: # cancel anything running t.cancel() for t in done: await t asyncio.run(async_main(), debug=True)
In this example, a function has been added that opens a file and writes to it until an exception halts it. This is important because what the .cancel()
method on a task is actually causes asyncio.CancelledError
to be raised wherever the coroutine happens to be executing. So in this case, it causes the finally
handler to run. I added a very elementary error to demonstrate what can happen next. The intent was to write a final line to the file then close it. Instead, I did it backwards. This is a programming error since you can't write to a closed file. The important takeaway here is that calling .cancel()
on a task raises asyncio.CancelledError
in the associated coroutine but it does not mean the task fails with that same error.
When you run this code you get this monstrosity as output
Traceback (most recent call last): File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 28, in <module> asyncio.run(async_main(), debug=True) File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete return future.result() File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 26, in async_main await t File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 4, in task_that_fails raise RuntimeError('task failed') RuntimeError: task failed Task exception was never retrieved future: <Task finished name='Task-4' coro=<cleanup_task() done, defined at /home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py:6> exception=ValueError('I/O operation on closed file.') created at /usr/lib/python3.9/asyncio/tasks.py:361> source_traceback: Object created at (most recent call last): File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 28, in <module> asyncio.run(async_main(), debug=True) File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.9/asyncio/base_events.py", line 634, in run_until_complete self.run_forever() File "/usr/lib/python3.9/asyncio/base_events.py", line 601, in run_forever self._run_once() File "/usr/lib/python3.9/asyncio/base_events.py", line 1897, in _run_once handle._run() File "/usr/lib/python3.9/asyncio/events.py", line 80, in _run self._context.run(self._callback, *self._args) File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 21, in async_main task2 = asyncio.create_task(cleanup_task()) File "/usr/lib/python3.9/asyncio/tasks.py", line 361, in create_task task = loop.create_task(coro) Traceback (most recent call last): File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 11, in cleanup_task await asyncio.sleep(1.0) File "/usr/lib/python3.9/asyncio/tasks.py", line 652, in sleep return await future asyncio.exceptions.CancelledError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 16, in cleanup_task f.write('done\n') ValueError: I/O operation on closed file.
This exception traceback is now larger than the example program! All the information is in fact there. Here's what the mess above is telling us
- The task calling
task_that_fails()
raised an exceptionRuntimeError: task failed
- A task encountered an exception and it was never waited on, in this case it is
task2
- After
asyncio.exceptions.CancelledError
happened (caused by calling.cancel()
), another exceptionValueError: I/O operation on closed file
happened.
This is getting pretty complex and difficult to deal with! How do we avoid the warning around Task exception was never retrieved
? Well, each task has to be await
ed after it is cancelled to be sure. Alright, let's add code to await
each task and print an exception if it happened
import asyncio import traceback async def task_that_fails(): raise RuntimeError('task failed') async def cleanup_task(): i = 0 try: f = open('example.out', 'w') while True: await asyncio.sleep(1.0) i += 1 f.write('test output %d!\n' % (i,)) finally: f.close() f.write('done\n') async def async_main(): task0 = asyncio.create_task(asyncio.sleep(10.0)) task1 = asyncio.create_task(task_that_fails()) task2 = asyncio.create_task(cleanup_task()) tasks = [task0, task1, task2] done, pending = await asyncio.wait(tasks, timeout=1.0) for t in pending: # cancel anything running t.cancel() for t in tasks: # check all tasks try: await t except asyncio.CancelledError: pass # don't care, expected this since .cancel() was called except BaseException: print("task failed:") traceback.print_exc() asyncio.run(async_main(), debug=True)
This code now cancels everything in pending
but also keeps a list of all tasks named tasks
. After everything is cancelled, it checks each task. You can safely ignore any exception that is of type asyncio.CancelledError
since .cancel()
was called on the task. Any other task exception is printed using the traceback
module.
At this point, we now have 11 lines of code just to wait on some tasks, cancel them, and show any failures. This doesn't even include aggregating up the results like real world code would want to do. This pattern of asyncio.wait
followed by .cancel()
then await
on each task proliferates everywhere in async Python code.
My solution
As you can imagine, I wanted to avoid this pattern entirely. What I opted to do was to implement versions of each of the base functions from the asyncio
package that are able to guarantee all tasks completed or that all tasks were cancelled. I added these into a library I had developed in the past called multi_await
. So I implemented these functions:
asyncio.gather
->multi_await.gather_or_abort
asyncio.wait
->multi_await.wait_or_abort
asyncio.as_completed
->multi-await.as_completed_or_abort
Whenever a task fails with an exception or a timeout occurs, these wrappers cancel all the other tasks that are still running. For the code to cancel all the other tasks I just use a method from the aiotk
module called aiotk.cancel_all
.
This allows simple code to be written like this
import multi_await import aiohttp import asyncio async def async_main(): urls = ['http://archive.org', 'http://localhost', 'http://fsf.org'] async with aiohttp.ClientSession() as session: for task in multi_await.as_completed_or_abort([asyncio.create_task(session.get(x)) for x in urls]): result = await task await result.read() print(result.status) asyncio.run(async_main())
In this example, the aiohttp
module is used to make HTTP requests to 3 different domains. Unless you have an HTTP server running locally, the second request fails because it tries to load contact from localhost
. You don't have to worry about the other two tasks that were created. This code always cancels those tasks when the request to localhost
throws an exception.
You can get the module from PyPi here https://pypi.org/project/multi-await/.
References
I found the following pieces helpful while solving this problem