Python asyncio stumbling block: aborting tasks

I use Python's async/await features in all kinds of software. If you have multiple things you want to do with a script concurrently then it is an easy choice. As long as your task isn't CPU bound, it works well.

But one thing I have consistently ran into is managing a bunch of asyncio tasks. By default, any async method in Python is a coroutine. When you want to run many coroutines simultaneously and wait on the result you need to convert them to tasks using asyncio.create_task. A list of these tasks is then passed to a function like asyncio.wait. This leads to an embarassingly common problem and is easy to demonstrate:

import asyncio

async def task_that_fails():
  raise RuntimeError('task failed')

async def async_main():
  task0 = asyncio.create_task(asyncio.sleep(10.0))
  task1 = asyncio.create_task(task_that_fails())
  done, pending = await asyncio.wait([task0, task1], timeout=1.0)
  for t in done:
    await t

asyncio.run(async_main())

If you think would output a mesage about RuntimeError: task failed, it does. The first task, task0 is waiting on asyncio.sleep(10.0) to complete. This takes at least 10 seconds. The second task task1 fails immediately by raising an exception. The call to asyncio.wait specifies a timeout of no more than 1 second. So after one second, asyncio.wait returns. In the first set called done is task1 but the second set pending has task0 in it.

The exception from task1 propagates when await t is called inside the for loop.

Traceback (most recent call last):
  File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example.py", line 13, in <module>
    asyncio.run(async_main())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example.py", line 11, in async_main
    await t
  File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example.py", line 4, in task_that_fails
    raise RuntimeError('task failed')
RuntimeError: task failed

So what happens to task0? In this example, nothing at all. It's still running at the time the interpreter exits. In this small example, it isn't an issue. But if you have a long running program, it means task0 would still be running and doing things. This probably isn't the intent of the code, especially since there is no easy way to reference task0.

One common solution is to replace asyncio.wait with asyncio.gather like this:

import asyncio

async def task_that_fails():
  raise RuntimeError('task failed')

async def async_main():
  task0 = asyncio.create_task(asyncio.sleep(10.0))
  task1 = asyncio.create_task(task_that_fails())
  await asyncio.gather(task0, task1)

asyncio.run(async_main())

This does basically the same thing, except asyncio.gather returns immediately when the exception occurs in task1. In both cases task0 is left running. Also, you can't specify a timeout with asyncio.gather. This means that the code waits potentially forever for the tasks to complete in the case of tasks that are working, but are very slow.

So how do we handle this? Well, any task created with asyncio.create_task has a .cancel() function that can be used to cancel it. So, let's refactor the original example to cancel any pending tasks.

import asyncio

async def task_that_fails():
  raise RuntimeError('task failed')

async def async_main():
  task0 = asyncio.create_task(asyncio.sleep(10.0))
  task1 = asyncio.create_task(task_that_fails())
  done, pending = await asyncio.wait([task0, task1], timeout=1.0)
  for t in pending: # cancel anything running
    t.cancel()
  for t in done:
    await t

asyncio.run(async_main(), debug=True)

By iterating over pending, each task that is still running can be cancelled. In this case, only task0 is in pending. Great! Now consider this more interesting example.

import asyncio

async def task_that_fails():
  raise RuntimeError('task failed')

async def cleanup_task():
  i = 0
  try:
    f = open('example.out', 'w')
    while True:
      await asyncio.sleep(1.0)
      i += 1
      f.write('test output %d!\n' % (i,))
  finally:
    f.close()
    f.write('done\n')

async def async_main():
  task0 = asyncio.create_task(asyncio.sleep(10.0))
  task1 = asyncio.create_task(task_that_fails())
  task2 = asyncio.create_task(cleanup_task())
  done, pending = await asyncio.wait([task0, task1, task2], timeout=1.0)
  for t in pending: # cancel anything running
    t.cancel()
  for t in done:
    await t

asyncio.run(async_main(), debug=True)

In this example, a function has been added that opens a file and writes to it until an exception halts it. This is important because what the .cancel() method on a task is actually causes asyncio.CancelledError to be raised wherever the coroutine happens to be executing. So in this case, it causes the finally handler to run. I added a very elementary error to demonstrate what can happen next. The intent was to write a final line to the file then close it. Instead, I did it backwards. This is a programming error since you can't write to a closed file. The important takeaway here is that calling .cancel() on a task raises asyncio.CancelledError in the associated coroutine but it does not mean the task fails with that same error.

When you run this code you get this monstrosity as output

Traceback (most recent call last):
  File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 28, in <module>
    asyncio.run(async_main(), debug=True)
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 26, in async_main
    await t
  File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 4, in task_that_fails
    raise RuntimeError('task failed')
RuntimeError: task failed
Task exception was never retrieved
future: <Task finished name='Task-4' coro=<cleanup_task() done, defined at /home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py:6> exception=ValueError('I/O operation on closed file.') created at /usr/lib/python3.9/asyncio/tasks.py:361>
source_traceback: Object created at (most recent call last):
  File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 28, in <module>
    asyncio.run(async_main(), debug=True)
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 634, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 601, in run_forever
    self._run_once()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 1897, in _run_once
    handle._run()
  File "/usr/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 21, in async_main
    task2 = asyncio.create_task(cleanup_task())
  File "/usr/lib/python3.9/asyncio/tasks.py", line 361, in create_task
    task = loop.create_task(coro)
Traceback (most recent call last):
  File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 11, in cleanup_task
    await asyncio.sleep(1.0)
  File "/usr/lib/python3.9/asyncio/tasks.py", line 652, in sleep
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ericu/www.hydrogen18.com/site/python-asyncio-stumbling-blocks-aborting-tasks/abort_task_example_and_cancel_with_error.py", line 16, in cleanup_task
    f.write('done\n')
ValueError: I/O operation on closed file.

This exception traceback is now larger than the example program! All the information is in fact there. Here's what the mess above is telling us

  1. The task calling task_that_fails() raised an exception RuntimeError: task failed
  2. A task encountered an exception and it was never waited on, in this case it is task2
  3. After asyncio.exceptions.CancelledError happened (caused by calling .cancel()), another exception ValueError: I/O operation on closed file happened.

This is getting pretty complex and difficult to deal with! How do we avoid the warning around Task exception was never retrieved? Well, each task has to be awaited after it is cancelled to be sure. Alright, let's add code to await each task and print an exception if it happened

import asyncio
import traceback

async def task_that_fails():
  raise RuntimeError('task failed')

async def cleanup_task():
  i = 0
  try:
    f = open('example.out', 'w')
    while True:
      await asyncio.sleep(1.0)
      i += 1
      f.write('test output %d!\n' % (i,))
  finally:
    f.close()
    f.write('done\n')

async def async_main():
  task0 = asyncio.create_task(asyncio.sleep(10.0))
  task1 = asyncio.create_task(task_that_fails())
  task2 = asyncio.create_task(cleanup_task())
  tasks = [task0, task1, task2]
  done, pending = await asyncio.wait(tasks, timeout=1.0)
  for t in pending: # cancel anything running
    t.cancel()
  for t in tasks: # check all tasks
    try:
      await t
    except asyncio.CancelledError:
      pass # don't care, expected this since .cancel() was called
    except BaseException:
      print("task failed:")
      traceback.print_exc()

asyncio.run(async_main(), debug=True)

This code now cancels everything in pending but also keeps a list of all tasks named tasks. After everything is cancelled, it checks each task. You can safely ignore any exception that is of type asyncio.CancelledError since .cancel() was called on the task. Any other task exception is printed using the traceback module.

At this point, we now have 11 lines of code just to wait on some tasks, cancel them, and show any failures. This doesn't even include aggregating up the results like real world code would want to do. This pattern of asyncio.wait followed by .cancel() then await on each task proliferates everywhere in async Python code.

My solution

As you can imagine, I wanted to avoid this pattern entirely. What I opted to do was to implement versions of each of the base functions from the asyncio package that are able to guarantee all tasks completed or that all tasks were cancelled. I added these into a library I had developed in the past called multi_await. So I implemented these functions:

  1. asyncio.gather -> multi_await.gather_or_abort
  2. asyncio.wait -> multi_await.wait_or_abort
  3. asyncio.as_completed -> multi-await.as_completed_or_abort

Whenever a task fails with an exception or a timeout occurs, these wrappers cancel all the other tasks that are still running. For the code to cancel all the other tasks I just use a method from the aiotk module called aiotk.cancel_all.

This allows simple code to be written like this

import multi_await
import aiohttp
import asyncio

async def async_main():
  urls = ['http://archive.org', 'http://localhost', 'http://fsf.org']
  async with aiohttp.ClientSession() as session:
    for task in multi_await.as_completed_or_abort([asyncio.create_task(session.get(x)) for x in urls]):
      result = await task
      await result.read()
      print(result.status)

asyncio.run(async_main())

In this example, the aiohttp module is used to make HTTP requests to 3 different domains. Unless you have an HTTP server running locally, the second request fails because it tries to load contact from localhost. You don't have to worry about the other two tasks that were created. This code always cancels those tasks when the request to localhost throws an exception.

You can get the module from PyPi here https://pypi.org/project/multi-await/.

References

I found the following pieces helpful while solving this problem

  1. asyncio cancel all tasks on first task's exception
  2. How to cancel all remaining tasks in gather if one fails?
  3. Asyncio Coroutine Patterns: Errors and cancellation
  4. Waiting in asyncio

Copyright Eric Urban 2022, or the respective entity where indicated