github twitter keybase instagram spotify

Initial Setup with asyncio

| updated

Foreword: This is part 0 of a 7-part series titled “asyncio: We Did It Wrong.” Once done, follow along with Part1: True Concurrency, Part 2: Graceful Shutdowns, Part 3: Exception Handling, Part 4: Working with Synchronous & Threaded Code, Part 5: Testing asyncio Code, Part 6: Debugging asyncio Code, and Part 7: Profiling asyncio Code.

Example code can be found on GitHub. All code on this post is licensed under MIT.


Goal: Mayhem Mandrill

To recap from the intro, we are building a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

Foundations for a Pub/Sub

There are a lot of choices for pub/sub-like technologies out there; I’m most familiar with Google Cloud Pub/Sub. But for our purposes, we’ll simulate a pub/sub with asyncio, inspired by this official-looking tutorial using asyncio.Queues.

Disclaimer: Using f-strings with like I do within log messages may not be ideal: no matter what the log level is set at, f-strings will always be evaluated; whereas the old form ('foo %s' % 'bar') is lazily-evaluated.

But I just love f-strings.

#!/usr/bin/env python3.7
"""
Notice! This requires: 
 - attrs==19.1.0
"""

import asyncio
import logging
import random
import string

import attr


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
    datefmt='%H:%M:%S',
)


@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)

    def __attrs_post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


# simulating an external publisher of events
async def publish(queue, n):
    choices = string.ascii_lowercase + string.digits

    for x in range(1, n + 1):
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=x, instance_name=f'cattle-{host_id}')
        await queue.put(msg)
        logging.info(f'Published {x} of {n} messages')

    await queue.put(None)  # publisher is done


async def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = await queue.get()
        if msg is None:  # publisher is done
            break

        # process the msg
        logging.info(f'Consumed {msg}')
        # unhelpful simulation of i/o work
        await asyncio.sleep(random.random())


def main():
    queue = asyncio.Queue()
    asyncio.run(publish(queue, 5))
    asyncio.run(consume(queue))


if __name__ == '__main__':
    main()

When we run this, we see:

$ python part-0/mayhem_1.py
18:38:02,124 INFO: Published 1 of 5 messages
18:38:02,124 INFO: Published 2 of 5 messages
18:38:02,124 INFO: Published 3 of 5 messages
18:38:02,124 INFO: Published 4 of 5 messages
18:38:02,124 INFO: Published 5 of 5 messages
18:38:02,124 INFO: Consumed PubSubMessage(instance_name='cattle-pdcg')
18:38:02,188 INFO: Consumed PubSubMessage(instance_name='cattle-nbs9')
18:38:02,952 INFO: Consumed PubSubMessage(instance_name='cattle-hw4f')
18:38:03,075 INFO: Consumed PubSubMessage(instance_name='cattle-bza9')
18:38:03,522 INFO: Consumed PubSubMessage(instance_name='cattle-gjl4')

We’ll use this as the starting point for a pub/sub simulator.

Running an asyncio-based Service

So far, we don’t have a running service; it’s merely just a pipeline or a batch job right now.

Also note that this asyncio.run is new as of 3.7:

# <---snip--->
def main():
    queue = asyncio.Queue()
    asyncio.run(publish(queue, 5))
    asyncio.run(consume(queue))

Before 3.7, the boilerplate code to run the event loop was like so:

# <---snip--->
def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(publish(queue, 5))
    loop.run_until_complete(consume(queue))
    loop.close()
    logging.info("Successfully shutdown the Mayhem service.")

Basically, we had to setup and teardown the loop ourselves. Note that it is a good habit to clean up and close the event loop since we initially created it.

But this is a service, so we don’t want it to just run once, but continually consume from a publisher. And unfortunately, there isn’t a decent way to start a long-running service that is not an HTTP server in python 3.7. So we’ll stick with the pre-3.7 boilerplate.

With that, we’ll create tasks out of the two coroutines using asyncio.create_task, which will schedule them on the loop. And then start the loop, telling it to run forever.

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.create_task(publish(queue, 5))
    loop.create_task(consume(queue))
    loop.run_forever()
    loop.close()
    logging.info("Successfully shutdown the Mayhem service.")

When running with this updated code, we see that all messages are published and then consumed. Then we hang because there is no more work to be done; we only published 5 messages, after all. To stop the “hanging” process, we must interrupt it (via ^C or sending a signal like kill -15 <pid>):

$ python part-0/mayhem_3.py
13:48:43,797 INFO: Published 1 of 5 messages
13:48:43,797 INFO: Published 2 of 5 messages
13:48:43,797 INFO: Published 3 of 5 messages
13:48:43,797 INFO: Published 4 of 5 messages
13:48:43,797 INFO: Published 5 of 5 messages
13:48:43,797 INFO: Consumed PubSubMessage(instance_name='cattle-940o')
13:48:43,860 INFO: Consumed PubSubMessage(instance_name='cattle-ebeq')
13:48:43,875 INFO: Consumed PubSubMessage(instance_name='cattle-vuxk')
13:48:44,733 INFO: Consumed PubSubMessage(instance_name='cattle-cfxt')
13:48:45,277 INFO: Consumed PubSubMessage(instance_name='cattle-i54x')
^CTraceback (most recent call last):
  File "part-0/mayhem_3.py", line 91, in <module>
    main()
  File "part-0/mayhem_3.py", line 85, in main
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 1739, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt

That’s nice and …ugly. You may notice that we never get to the log line of "Successfully shutdown the Mayhem service."; we actually are not closing the loop either. Let’s add a quick fix.

Running the event loop defensively

Our main function is just typical boilerplate code to get the service running. We have a queue instance, setup the loop, schedule the publish & consume tasks, and start then close the event loop:

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.create_task(publish(queue, 5))
    loop.create_task(consume(queue))
    loop.run_forever()
    loop.close()
    logging.info("Successfully shutdown the Mayhem service.")

We’ve been stopping our service with CTRL-C, a.k.a. SIGINT, a.k.a. KeyboardInterrupt. So let’s wrap our running of the service in a try/catch/finally around that interrupt signal:

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue, 5))
        loop.create_task(consume(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")


if __name__ == "__main__":
    main()

And running it again, sending CTRL-C after it is done consuming :

$ python part-0/mayhem_4.py
14:38:31,394 INFO: Published 1 of 5 messages
14:38:31,394 INFO: Published 2 of 5 messages
14:38:31,394 INFO: Published 3 of 5 messages
14:38:31,395 INFO: Published 4 of 5 messages
14:38:31,395 INFO: Published 5 of 5 messages
14:38:31,395 INFO: Consumed PubSubMessage(instance_name='cattle-yyfk')
14:38:31,956 INFO: Consumed PubSubMessage(instance_name='cattle-hppg')
14:38:32,740 INFO: Consumed PubSubMessage(instance_name='cattle-vlxt')
14:38:33,145 INFO: Consumed PubSubMessage(instance_name='cattle-1qww')
14:38:34,69 INFO: Consumed PubSubMessage(instance_name='cattle-vwfo')
^C14:38:35,991 INFO: Process interrupted
14:38:35,992 INFO: Successfully shutdown the Mayhem service.

This is clean enough for now, but later on we’ll build off of this by adding a graceful shutdown in part 2 as well as add proper exception handling in part 3.


Follow the next part of this series for Part 1: true concurrency, or skip ahead to Part 2: adding a graceful shutdown, Part 3: exception handling, Part 4: working with synchronous and threaded code, Part 5: testing, Part 6: debugging, or Part 7: profiling asyncio code.




Has this article been helpful for you? Consider expressing your gratitude!
Need some help? I'm available for tutoring, mentoring, and interview prep!


comments powered by Disqus