Initial Setup with asyncio
Foreword: This is part 0 of a 7-part series titled “asyncio: We Did It Wrong.” Once done, follow along with Part1: True Concurrency, Part 2: Graceful Shutdowns, Part 3: Exception Handling, Part 4: Working with Synchronous & Threaded Code, Part 5: Testing asyncio
Code, Part 6: Debugging asyncio
Code, and Part 7: Profiling asyncio
Code.
Example code can be found on GitHub. All code on this post is licensed under MIT.
Goal: Mayhem Mandrill
To recap from the intro, we are building a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.
Foundations for a Pub/Sub
There are a lot of choices for pub/sub-like technologies out there; I’m most familiar with Google Cloud Pub/Sub. But for our purposes, we’ll simulate a pub/sub with asyncio
, inspired by this official-looking tutorial using asyncio.Queue
s.
Disclaimer: Using f-strings with like I do within log messages may not be ideal: no matter what the log level is set at, f-strings will always be evaluated; whereas the old form (
'foo %s' % 'bar'
) is lazily-evaluated.But I just love f-strings.
#!/usr/bin/env python3.7
"""
Notice! This requires:
- attrs==19.1.0
"""
import asyncio
import logging
import random
import string
import attr
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
datefmt='%H:%M:%S',
)
@attr.s
class PubSubMessage:
instance_name = attr.ib()
message_id = attr.ib(repr=False)
hostname = attr.ib(repr=False, init=False)
def __attrs_post_init__(self):
self.hostname = f'{self.instance_name}.example.net'
# simulating an external publisher of events
async def publish(queue, n):
choices = string.ascii_lowercase + string.digits
for x in range(1, n + 1):
host_id = ''.join(random.choices(choices, k=4))
instance_name = f'cattle-{host_id}'
msg = PubSubMessage(message_id=x, instance_name=f'cattle-{host_id}')
await queue.put(msg)
logging.info(f'Published {x} of {n} messages')
await queue.put(None) # publisher is done
async def consume(queue):
while True:
# wait for an item from the publisher
msg = await queue.get()
if msg is None: # publisher is done
break
# process the msg
logging.info(f'Consumed {msg}')
# unhelpful simulation of i/o work
await asyncio.sleep(random.random())
def main():
queue = asyncio.Queue()
asyncio.run(publish(queue, 5))
asyncio.run(consume(queue))
if __name__ == '__main__':
main()
When we run this, we see:
$ python part-0/mayhem_1.py
18:38:02,124 INFO: Published 1 of 5 messages
18:38:02,124 INFO: Published 2 of 5 messages
18:38:02,124 INFO: Published 3 of 5 messages
18:38:02,124 INFO: Published 4 of 5 messages
18:38:02,124 INFO: Published 5 of 5 messages
18:38:02,124 INFO: Consumed PubSubMessage(instance_name='cattle-pdcg')
18:38:02,188 INFO: Consumed PubSubMessage(instance_name='cattle-nbs9')
18:38:02,952 INFO: Consumed PubSubMessage(instance_name='cattle-hw4f')
18:38:03,075 INFO: Consumed PubSubMessage(instance_name='cattle-bza9')
18:38:03,522 INFO: Consumed PubSubMessage(instance_name='cattle-gjl4')
We’ll use this as the starting point for a pub/sub simulator.
Running an asyncio
-based Service
So far, we don’t have a running service; it’s merely just a pipeline or a batch job right now.
Also note that this asyncio.run
is new as of 3.7:
# <---snip--->
def main():
queue = asyncio.Queue()
asyncio.run(publish(queue, 5))
asyncio.run(consume(queue))
Before 3.7, the boilerplate code to run the event loop was like so:
# <---snip--->
def main():
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.run_until_complete(publish(queue, 5))
loop.run_until_complete(consume(queue))
loop.close()
logging.info("Successfully shutdown the Mayhem service.")
Basically, we had to setup and teardown the loop ourselves. Note that it is a good habit to clean up and close the event loop since we initially created it.
But this is a service, so we don’t want it to just run once, but continually consume from a publisher. And unfortunately, there isn’t a decent way to start a long-running service that is not an HTTP server in python 3.7. So we’ll stick with the pre-3.7 boilerplate.
With that, we’ll create tasks out of the two coroutines using asyncio.create_task
, which will schedule them on the loop. And then start the loop, telling it to run forever.
def main():
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.create_task(publish(queue, 5))
loop.create_task(consume(queue))
loop.run_forever()
loop.close()
logging.info("Successfully shutdown the Mayhem service.")
When running with this updated code, we see that all messages are published and then consumed. Then we hang because there is no more work to be done; we only published 5 messages, after all. To stop the “hanging” process, we must interrupt it (via ^C
or sending a signal like kill -15 <pid>
):
$ python part-0/mayhem_3.py
13:48:43,797 INFO: Published 1 of 5 messages
13:48:43,797 INFO: Published 2 of 5 messages
13:48:43,797 INFO: Published 3 of 5 messages
13:48:43,797 INFO: Published 4 of 5 messages
13:48:43,797 INFO: Published 5 of 5 messages
13:48:43,797 INFO: Consumed PubSubMessage(instance_name='cattle-940o')
13:48:43,860 INFO: Consumed PubSubMessage(instance_name='cattle-ebeq')
13:48:43,875 INFO: Consumed PubSubMessage(instance_name='cattle-vuxk')
13:48:44,733 INFO: Consumed PubSubMessage(instance_name='cattle-cfxt')
13:48:45,277 INFO: Consumed PubSubMessage(instance_name='cattle-i54x')
^CTraceback (most recent call last):
File "part-0/mayhem_3.py", line 91, in <module>
main()
File "part-0/mayhem_3.py", line 85, in main
loop.run_forever()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
self._run_once()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 1739, in _run_once
event_list = self._selector.select(timeout)
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/selectors.py", line 558, in select
kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt
That’s nice and …ugly. You may notice that we never get to the log line of "Successfully shutdown the Mayhem service."
; we actually are not closing the loop either. Let’s add a quick fix.
Running the event loop defensively
Our main
function is just typical boilerplate code to get the service running. We have a queue instance, setup the loop, schedule the publish & consume tasks, and start then close the event loop:
def main():
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.create_task(publish(queue, 5))
loop.create_task(consume(queue))
loop.run_forever()
loop.close()
logging.info("Successfully shutdown the Mayhem service.")
We’ve been stopping our service with CTRL-C
, a.k.a. SIGINT
, a.k.a. KeyboardInterrupt
. So let’s wrap our running of the service in a try
/catch
/finally
around that interrupt signal:
def main():
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
try:
loop.create_task(publish(queue, 5))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt:
logging.info("Process interrupted")
finally:
loop.close()
logging.info("Successfully shutdown the Mayhem service.")
if __name__ == "__main__":
main()
And running it again, sending CTRL-C
after it is done consuming :
$ python part-0/mayhem_4.py
14:38:31,394 INFO: Published 1 of 5 messages
14:38:31,394 INFO: Published 2 of 5 messages
14:38:31,394 INFO: Published 3 of 5 messages
14:38:31,395 INFO: Published 4 of 5 messages
14:38:31,395 INFO: Published 5 of 5 messages
14:38:31,395 INFO: Consumed PubSubMessage(instance_name='cattle-yyfk')
14:38:31,956 INFO: Consumed PubSubMessage(instance_name='cattle-hppg')
14:38:32,740 INFO: Consumed PubSubMessage(instance_name='cattle-vlxt')
14:38:33,145 INFO: Consumed PubSubMessage(instance_name='cattle-1qww')
14:38:34,69 INFO: Consumed PubSubMessage(instance_name='cattle-vwfo')
^C14:38:35,991 INFO: Process interrupted
14:38:35,992 INFO: Successfully shutdown the Mayhem service.
This is clean enough for now, but later on we’ll build off of this by adding a graceful shutdown in part 2 as well as add proper exception handling in part 3.
Follow the next part of this series for Part 1: true concurrency, or skip ahead to Part 2: adding a graceful shutdown, Part 3: exception handling, Part 4: working with synchronous and threaded code, Part 5: testing, Part 6: debugging, or Part 7: profiling asyncio
code.
∴
comments powered by Disqus