github twitter keybase instagram spotify

Exception Handling in asyncio

| updated

Foreword: This is part 3 of a 7-part series titled “asyncio: We Did It Wrong.” Take a look at Part 1: True Concurrency and Part 2: Graceful Shutdowns for where we are in the tutorial now. Once done, follow along with Part 4: Working with Synchronous & Threaded Code, or skip ahead to Part 5: Testing asyncio Code, Part 6: Debugging asyncio Code, or Part 7: Profiling asyncio Code.

Example code can be found on GitHub. All code on this post is licensed under MIT.


Mayhem Mandrill Recap

The goal for this 7-part series is to build a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

At the end of part 2, our service looked like this:

#!/usr/bin/env python3.7
"""
Notice! This requires:
 - attrs==19.1.0
"""

import asyncio
import logging
import random
import signal
import string
import uuid

import attr

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)

@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)
    restarted     = attr.ib(repr=False, default=False)
    saved         = attr.ib(repr=False, default=False)
    acked         = attr.ib(repr=False, default=False)
    extended_cnt  = attr.ib(repr=False, default=0)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.debug(f"Published message {msg}")
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())


async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.restart = True
    logging.info(f"Restarted {msg.hostname}")


async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.save = True
    logging.info(f"Saved {msg} into database")


async def cleanup(msg, event):
    # this will block the rest of the coro until `event.set` is called
    await event.wait()
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.acked = True
    logging.info(f"Done. Acked {msg}")


async def extend(msg, event):
    while not event.is_set():
        msg.extended_cnt += 1
        logging.info(f"Extended deadline by 3 seconds for {msg}")
        # want to sleep for less than the deadline amount
        await asyncio.sleep(2)


async def handle_message(msg):
    event = asyncio.Event()
    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    await asyncio.gather(save(msg), restart_host(msg))
    event.set()

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")
        asyncio.create_task(handle_message(msg))

async def shutdown(signal, loop):
    logging.info(f"Received exit signal {signal.name}...")
    logging.info("Closing database connections")
    logging.info("Nacking outstanding messages")
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info(f"Cancelling {len(tasks)} outstanding tasks")
    await asyncio.gather(*tasks, return_exceptions=True)
    logging.info(f"Flushing metrics")
    loop.stop()

def main():
    loop = asyncio.get_event_loop()
    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
    queue = asyncio.Queue()

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")

if __name__ == "__main__":
    main()

Global Exception Handling

We haven’t been handling any exceptions, you may have noticed. Let’s take a look at what happens if saving a message fails:

async def consume(queue):
    while True:
        msg = await queue.get()
        # totally realistic exception
        if random.randrange(1, 5) == 3:
            raise Exception(f"Could not consume {msg}")
        logging.info(f"Consumed {msg}")
        asyncio.create_task(handle_message(msg))

Running this:

$ python part-3/mayhem_1.py
14:25:43,943 ERROR: Task exception was never retrieved
future: <Task finished coro=<consume() done, defined at part-3/mayhem_1.py:140> exception=Exception("Could not consume PubSubMessage(instance_name='cattle-gw75')")>
Traceback (most recent call last):
  File "part-3/mayhem_1.py", line 150, in consume
    raise Exception(f"Could not consume {msg}")
Exception: Could not consume PubSubMessage(instance_name='cattle-gw75')
^C14:25:46,949 INFO: Received exit signal SIGINT...
14:25:46,950 INFO: Closing database connections
14:25:46,950 INFO: Nacking outstanding messages
14:25:46,950 INFO: Cancelling 1 outstanding tasks
14:25:46,950 INFO: Flushing metrics
14:25:46,951 INFO: Successfully shutdown the Mayhem service.

It just hangs, so we need to interrupt it. We also see the exception: Exception: Could not consume PubSubMessage(instance_name='cattle-gw75'), but before that, we have an error: Task exception was never retrieved. This is admittedly a part of the asyncio API that’s not that friendly. If this was synchronous code, we’d simply see the error that we raised and error out. But this gets swallowed up into an unretrieved task exception, and the script itself looks like it hangs. If we put the log line to info in the publish coroutine, we’d see that we’re actually not hanging, that we’re still able to publish messages.

We’re not handling exceptions appropriately. So to deal with this, as advised in the asyncio documentation, we should add a global/default exception handler:

def handle_exception(loop, context):
    # context["message"] will always be there; but context["exception"] may not
    msg = context.get("exception", context["message"])
    logging.error(f"Caught exception: {msg}")
    logging.info("Shutting down...")
    asyncio.create_task(shutdown(loop))

Our exception handler function must take in two arguments, loop and context (if you want to give it other arguments, use functools.partial). We’re also going to call our shutdown coroutine function because at this point, something serious is going on where we can’t consume messages, and we should probably fail hard.

We’re going to update the shutdown coroutine function so that the signal parameter is optional. This is to make it clear that we’re shutting down from either “within” or from an external signal.

async def shutdown(loop, signal=None):
    """Cleanup tasks tied to the service's shutdown."""
    if signal:
        logging.info(f"Received exit signal {signal.name}...")
    logging.info("Closing database connections")
    # <-- snip -->

We then attached handle_exception to the loop, as well as take the opportunity to update the creating of shutdown tasks with the updated function signature:

def main():
    loop = asyncio.get_event_loop()
    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, signal=s)))
    loop.set_exception_handler(handle_exception)

    # <-- snip -->

Let’s see how it looks:

$ python part-3/mayhem_2.py
14:34:51,123 INFO: Consumed PubSubMessage(instance_name='cattle-zgc7')
14:34:51,123 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-zgc7')
14:34:51,192 INFO: Consumed PubSubMessage(instance_name='cattle-7j5h')
14:34:51,192 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-7j5h')
14:34:51,395 INFO: Saved PubSubMessage(instance_name='cattle-7j5h') into database
14:34:51,658 INFO: Restarted cattle-zgc7.example.net
14:34:51,878 INFO: Restarted cattle-7j5h.example.net
14:34:51,891 INFO: Saved PubSubMessage(instance_name='cattle-zgc7') into database
14:34:52,165 ERROR: Caught exception: Could not consume PubSubMessage(instance_name='cattle-1ney')
14:34:52,165 INFO: Shutting down...
14:34:52,165 INFO: Closing database connections
14:34:52,165 INFO: Nacking outstanding messages
14:34:52,165 INFO: Cancelling 5 outstanding tasks
14:34:52,166 INFO: Flushing metrics
14:34:52,166 INFO: Successfully shutdown the Mayhem service.

Now it’s clear that we’ve handled the exception, and we do not get the error Task exception was never retrieved.

Specific Handlers

You may have noticed that, while we’re catching exceptions on the top level, we may not want to handle all exceptions the same way.

Say, for some reason, we are okay with just logging an error if save fails. But if restarting a host fails, maybe we want to retry, or to “nack” or not acknowledge the Pub/Sub message so it can get re-delivered.

Let’s first define a specific exception for ourselves:

class RestartFailed(Exception):
    pass

And then incorporate it into our restart_host coroutine function

async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    # totally realistic exception
    if random.randrange(1, 5) == 3:
        raise RestartFailed(f"Could not restart {msg.hostname}")
    msg.restart = True
    logging.info(f"Restarted {msg.hostname}")

And we’ll add a basic exception to save:

async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    # totally realistic exception
    if random.randrange(1, 5) == 3:
        raise Exception(f"Could not save {msg}")
    msg.save = True
    logging.info(f"Saved {msg} into database")

If we don’t do anything more, this will trigger the global exception handler:

$ python part-3/mayhem_3.py
14:45:13,336 INFO: Consumed PubSubMessage(instance_name='cattle-eoj8')
14:45:13,337 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-eoj8')
14:45:13,945 INFO: Saved PubSubMessage(instance_name='cattle-eoj8') into database
14:45:14,234 INFO: Restarted cattle-eoj8.example.net
14:45:14,250 INFO: Consumed PubSubMessage(instance_name='cattle-l90o')
14:45:14,251 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-l90o')
14:45:14,350 INFO: Saved PubSubMessage(instance_name='cattle-l90o') into database
14:45:14,404 ERROR: Caught exception: Could not restart cattle-l90o.example.net
14:45:14,404 INFO: Shutting down...
14:45:14,405 INFO: Closing database connections
14:45:14,406 INFO: Nacking outstanding messages
14:45:14,406 INFO: Cancelling 6 outstanding tasks
14:45:14,407 INFO: Flushing metrics
14:45:14,408 INFO: Successfully shutdown the Mayhem service.

Of course the global exception handler on the loop handles this, but there is something nuanced going on. Within handle_message, we use asyncio.gather where the return_exceptions is set to False by default. When it is False,

the first raised exception is immediately propagated to the task that awaits on gather()

which then triggers our global handle_exception function and then the shutdown coroutine.

If we didn’t shutdown based off of an exception, it would continue to process the other tasks/coroutines that it was given. It would also never be complete the tasks created from a message that errored out. This is because we do not return the exception with the results. And with that, we’ll never hit the event.set() line in handle_message.

Let me reiterate: handle_message will never complete when a message errors.

To show you what I mean, if I remove the call to shutdown in our global exception handler:

$ python part-3/mayhem_4.py
14:49:37,539 INFO: Consumed PubSubMessage(instance_name='cattle-703c')
14:49:37,539 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-703c')
14:49:37,817 ERROR: Caught exception: Could not save PubSubMessage(instance_name='cattle-703c')
14:49:39,541 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-703c')
14:49:41,542 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-703c')
# and so on

We are essentially spinning around this Pub/Sub message. We’re extending the deadline of this message and never getting to cleanup.

To address this, we can pass in return_exceptions=True to asyncio.gather so that exceptions get returned as successful results, rather than raised. We’ll then have a specific exception handler:

def handle_results(results, msg):
    for result in results:
        if isinstance(result, RestartFailed):
            logging.error(f"Retrying for failure to restart: {msg.hostname}")
        elif isinstance(result, Exception):
            logging.error(f"Handling general error: {result}")

async def handle_message(msg):
    event = asyncio.Event()
    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    results = await asyncio.gather(
        save(msg), restart_host(msg), return_exceptions=True
    )
    handle_results(results, msg)
    event.set()

When we run this updated code, we see that we no longer spin around a message that errors:

$ python part-3/mayhem_5.py
14:58:15,750 INFO: Consumed PubSubMessage(instance_name='cattle-jmtv')
14:58:15,981 INFO: Saved PubSubMessage(instance_name='cattle-jmtv') into database
14:58:15,981 ERROR: Retrying for failure to restart: cattle-jmtv.example.net
14:58:16,873 INFO: Restarted cattle-hukm.example.net
14:58:16,903 INFO: Done. Acked PubSubMessage(instance_name='cattle-jmtv')

Much better!

Recap

Exceptions will not crash the system - unlike non-asyncio programs. And they might go unnoticed, as we saw when return_exceptions=False, where we failied to save a message causing us to never hit cleanup.

Be sure to set some sort of exception handling – either globally with loop.set_exception_handler, individually like in our handle_results function, or a mix of both (you’ll probably need a mix).

I personally like using asyncio.gather because the order of the returned results are deterministic, but it’s easy to get tripped up with it. By default, it will swallow exceptions but happily continue working on the other tasks that were given. If an exception is never returned, weird behavior can happen like we saw.


Follow the next part of this series for working with synchronous and threaded code, testing, debugging, and profiling asyncio code.




Has this article been helpful for you? Consider expressing your gratitude!
Need some help? I'm available for tutoring, mentoring, and interview prep!


comments powered by Disqus