Example code can be found on GitHub. All code on this post is licensed under MIT.
Mayhem Mandrill Recap #
The goal for this 8-part series is to build a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.
We’re going to take a slight detour and not build off of the previous part. Rather, let’s revisit the synchronous code for a pubsub that we built in Part 1:
import logging
import queue
import random
import string
import time
import attr
# NB: Using f-strings with log messages may not be ideal since no matter
# what the log level is set at, f-strings will always be evaluated
# whereas the old form ("foo %s" % "bar") is lazily-evaluated.
# But I just love f-strings.
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
@attr.s
class PubSubMessage:
instance_name = attr.ib()
message_id = attr.ib(repr=False)
hostname = attr.ib(repr=False, init=False)
def __attrs_post_init__(self):
self.hostname = f"{self.instance_name}.example.net"
def publish(queue, n):
"""Simulates an external publisher of messages.
Args:
queue (queue.Queue): Queue to publish messages to.
n (int): Number of messages to publish.
"""
choices = string.ascii_lowercase + string.digits
for x in range(1, n + 1):
host_id = "".join(random.choices(choices, k=4))
instance_name = f"cattle-{host_id}"
msg = PubSubMessage(message_id=x, instance_name=instance_name)
# publish an item
queue.put(msg)
logging.info(f"Published {x} of {n} messages")
# indicate the publisher is done
queue.put(None)
def consume(queue):
"""Consumer client to simulate subscribing to a publisher.
Args:
queue (queue.Queue): Queue from which to consume messages.
"""
while True:
# wait for an item from the publisher
msg = queue.get()
# the publisher emits None to indicate that it is done
if msg is None:
break
# process the msg
logging.info(f"Consumed {msg}")
# simulate i/o operation using sleep
time.sleep(random.random())
def main():
q = queue.Queue()
publish(q, 5)
consume(q)
if __name__ == "__main__":
main()
Making Synchronous Code asyncio
-Friendly
#
I’m sure that as folks have started to use asyncio
, they’ve realized that async
/await
starts permeating everything around the code-base; everything needs to be async. This isn’t necessarily a bad thing; it just forces a shift in perspective.
First, let’s make this synchronous publisher that will continually publish messages, similar to our async version. We’re going to rename publish
to publish_sync
so it’s easier for us to differentiate:
# <-- snip -->
import random
import time
import uuid
# <-- snip -->
def publish_sync(queue):
"""Simulates an external publisher of messages.
Args:
queue (queue.Queue): Queue to publish messages to.
n (int): Number of messages to publish.
"""
choices = string.ascii_lowercase + string.digits
while True:
msg_id = str(uuid.uuid4())
host_id = "".join(random.choices(choices, k=4))
instance_name = f"cattle-{host_id}"
msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
# publish an item
queue.put(msg)
logging.info(f"Published {msg}")
# simulate randomness of publishing messages
time.sleep(random.random())
Very similar to our asynchronous publisher. We’ll do the same for our consumer:
def consume_sync(queue):
while True:
msg = queue.get()
logging.info(f"Consumed {msg}")
# Substitute for handling a message
time.sleep(random.random())
Now let’s have an asynchronous publish
coroutine function that will call our publish_sync
function:
# <-- snip -->
import concurrent.futures
# <-- snip -->
async def publish(queue):
logging.info("Starting publisher")
loop = asyncio.get_running_loop()
executor = concurrent.futures.ThreadPoolExecutor()
await loop.run_in_executor(executor, publish_sync, queue)
Here, I’m using a ThreadPoolExecutor
from concurrent.futures
. Then I await
on asyncio
’s loop.run_in_executor
method which will submit the non-async work to a threadpool for us. loop.run_in_executor
returns an asyncio.Future
which we don’t need (since we aren’t returning anything from the publish_sync
function).
Note: We could use a ProcessPoolExecutor
instead of a ThreadPoolExecutor
, but the locks underneath queue.Queue
are not pickleable, which is a requirement for using ProcessPoolExecutor
.
You might ask, why the await
and not asyncio.create_task
? Two reasons: first, loop.run_in_executor
returns a concurrent.futures.Future
object, which isn’t compatible with asyncio.create_task
. So if we wanted to schedule a future like we do a task, then we’d use asyncio.ensure_future
(yet another confusing API name as it schedules the future onto the loop) or asyncio.wrap_future
(we’d use this if we wanted the returning concurrent.futures.Future
object to be compatible with asyncio
).
Second, we don’t have anything after the await
in either the consume
or publish
coroutine functions. If we did, and we didn’t want to block ourselves, then we’d use asyncio.ensure_future
to schedule the future onto the loop.
Now let’s setup the same for consume
coroutine functions and consume_sync
:
async def consume(queue):
logging.info("Starting consumer")
loop = asyncio.get_running_loop()
executor = concurrent.futures.ThreadPoolExecutor()
await loop.run_in_executor(executor, consume_sync, queue)
There’s no reason to have two separate threadpool executors, so we can abstract that out and pass it in to both coroutines (we could abstract away consume
and publish
completely, but I’m leaving those in for the sake of readability/comprehension):
async def publish(executor, queue):
logging.info("Starting publisher")
loop = asyncio.get_running_loop()
await loop.run_in_executor(executor, publish_sync, queue)
async def consume(executor, queue):
logging.info("Starting consumer")
loop = asyncio.get_running_loop()
await loop.run_in_executor(executor, consume_sync, queue)
Now let’s setup everything in our main
function (we’ll add back in our graceful shutdown in a bit):
def main():
executor = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
q = queue.Queue()
try:
loop.create_task(publish(executor, q))
loop.create_task(consume(executor, q))
loop.run_forever()
finally:
loop.close()
logging.info("Successfully shutdown the Mayhem service.")
Let’s try to run this as is:
$ python part-4/mayhem_1.py
11:36:53,326 INFO: Starting publisher
11:36:53,326 INFO: Published message PubSubMessage(instance_name='cattle-aefp')
11:36:53,327 INFO: Starting consumer
11:36:53,327 INFO: Consumed PubSubMessage(instance_name='cattle-aefp')
11:36:53,327 INFO: Handling PubSubMessage(instance_name='cattle-aefp')
11:36:54,200 INFO: Published message PubSubMessage(instance_name='cattle-ftol')
11:36:54,200 INFO: Consumed PubSubMessage(instance_name='cattle-ftol')
11:36:54,200 INFO: Handling PubSubMessage(instance_name='cattle-ftol')
11:36:55,4 INFO: Published message PubSubMessage(instance_name='cattle-e9pi')
11:36:55,4 INFO: Consumed PubSubMessage(instance_name='cattle-e9pi')
11:36:55,5 INFO: Handling PubSubMessage(instance_name='cattle-e9pi')
11:36:55,396 INFO: Published message PubSubMessage(instance_name='cattle-5bfv')
^C11:36:56,423 INFO: Successfully shutdown the Mayhem service.
Traceback (most recent call last):
File "part-4/mayhem_1.py", line 135, in <module>
main()
File "part-4/mayhem_1.py", line 128, in main
loop.run_forever()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
self._run_once()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 1739, in _run_once
event_list = self._selector.select(timeout)
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/selectors.py", line 558, in select
kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt
11:36:56,530 INFO: Consumed PubSubMessage(instance_name='cattle-042r')
11:36:56,530 INFO: Handling PubSubMessage(instance_name='cattle-042r')
11:36:56,664 INFO: Published message PubSubMessage(instance_name='cattle-opu7')
11:36:57,184 INFO: Published message PubSubMessage(instance_name='cattle-i4wy')
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
t.join()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1032, in join
self._wait_for_tstate_lock()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
There’s a couple of things here: it doesn’t look like we’re being concurrent, and we need to CTRL-C
twice to get the script to stop (and look at that ugly traceback!).
Let’s tackle the concurrency one first. It’s more of a perception issue than anything.
Instead of one publisher and one consumer, let’s have five! And we’ll use asyncio.ensure_future
to similarly create and schedule the asyncio.gather
s on the loop (we’re unable to use create_task
since it does not handle a GatherFuture
type).
async def publish(executor, queue):
logging.info("Starting publisher")
loop = asyncio.get_running_loop()
futures = [
loop.run_in_executor(executor, publish_sync, queue) for i in range(5)
]
asyncio.ensure_future(asyncio.gather(*futures, return_exceptions=True))
async def consume(executor, queue):
logging.info("Starting consumer")
loop = asyncio.get_running_loop()
futures = [
loop.run_in_executor(executor, consume_sync, queue) for i in range(5)
]
asyncio.ensure_future(asyncio.gather(*futures, return_exceptions=True))
Running this again, we can see we are concurrent:
$ python part-4/mayhem_2.py
15:44:39,933 INFO: Starting publisher
15:44:39,934 INFO: Published PubSubMessage(instance_name='cattle-ylk2')
15:44:39,934 INFO: Published PubSubMessage(instance_name='cattle-ffbl')
15:44:39,934 INFO: Published PubSubMessage(instance_name='cattle-f3k2')
15:44:39,935 INFO: Published PubSubMessage(instance_name='cattle-xle6')
15:44:39,935 INFO: Published PubSubMessage(instance_name='cattle-excf')
15:44:39,935 INFO: Starting consumer
15:44:39,935 INFO: Consumed PubSubMessage(instance_name='cattle-ylk2')
15:44:39,935 INFO: Consumed PubSubMessage(instance_name='cattle-ffbl')
15:44:39,936 INFO: Consumed PubSubMessage(instance_name='cattle-f3k2')
15:44:39,936 INFO: Consumed PubSubMessage(instance_name='cattle-xle6')
15:44:39,936 INFO: Consumed PubSubMessage(instance_name='cattle-excf')
15:44:40,36 INFO: Published PubSubMessage(instance_name='cattle-1vez')
15:44:40,151 INFO: Consumed PubSubMessage(instance_name='cattle-1vez')
15:44:40,497 INFO: Published PubSubMessage(instance_name='cattle-c7xm')
15:44:40,497 INFO: Consumed PubSubMessage(instance_name='cattle-c7xm')
15:44:40,521 INFO: Published PubSubMessage(instance_name='cattle-d7ao')
15:44:40,521 INFO: Consumed PubSubMessage(instance_name='cattle-d7ao')
15:44:40,541 INFO: Published PubSubMessage(instance_name='cattle-835m')
15:44:40,560 INFO: Consumed PubSubMessage(instance_name='cattle-835m')
15:44:40,597 INFO: Published PubSubMessage(instance_name='cattle-h7tp')
15:44:40,598 INFO: Consumed PubSubMessage(instance_name='cattle-h7tp')
15:44:40,703 INFO: Published PubSubMessage(instance_name='cattle-58u7')
15:44:40,704 INFO: Consumed PubSubMessage(instance_name='cattle-58u7')
15:44:40,717 INFO: Published PubSubMessage(instance_name='cattle-yj4x')
15:44:40,726 INFO: Consumed PubSubMessage(instance_name='cattle-yj4x')
15:44:40,851 INFO: Published PubSubMessage(instance_name='cattle-ahua')
15:44:40,852 INFO: Consumed PubSubMessage(instance_name='cattle-ahua')
15:44:40,883 INFO: Published PubSubMessage(instance_name='cattle-jsob')
15:44:40,914 INFO: Published PubSubMessage(instance_name='cattle-36eb')
15:44:41,64 INFO: Published PubSubMessage(instance_name='cattle-dt8e')
15:44:41,115 INFO: Consumed PubSubMessage(instance_name='cattle-jsob')
15:44:41,294 INFO: Published PubSubMessage(instance_name='cattle-8smh')
15:44:41,330 INFO: Published PubSubMessage(instance_name='cattle-lp4o')
15:44:41,345 INFO: Published PubSubMessage(instance_name='cattle-yhsg')
15:44:41,369 INFO: Consumed PubSubMessage(instance_name='cattle-36eb')
15:44:41,370 INFO: Published PubSubMessage(instance_name='cattle-moi6')
15:44:41,388 INFO: Consumed PubSubMessage(instance_name='cattle-dt8e')
15:44:41,411 INFO: Consumed PubSubMessage(instance_name='cattle-8smh')
15:44:41,446 INFO: Consumed PubSubMessage(instance_name='cattle-lp4o')
15:44:41,461 INFO: Consumed PubSubMessage(instance_name='cattle-yhsg')
15:44:41,462 INFO: Consumed PubSubMessage(instance_name='cattle-moi6')
15:44:41,571 INFO: Published PubSubMessage(instance_name='cattle-8jud')
15:44:41,579 INFO: Published PubSubMessage(instance_name='cattle-6zg4')
15:44:41,625 INFO: Published PubSubMessage(instance_name='cattle-5lr9')
15:44:41,677 INFO: Consumed PubSubMessage(instance_name='cattle-8jud')
15:44:41,717 INFO: Consumed PubSubMessage(instance_name='cattle-6zg4')
15:44:41,723 INFO: Published PubSubMessage(instance_name='cattle-ex6h')
15:44:41,795 INFO: Published PubSubMessage(instance_name='cattle-v3bb')
15:44:41,823 INFO: Consumed PubSubMessage(instance_name='cattle-5lr9')
15:44:41,841 INFO: Consumed PubSubMessage(instance_name='cattle-ex6h')
^C15:44:41,861 INFO: Successfully shutdown the Mayhem service.
Traceback (most recent call last):
File "part-4/mayhem_2.py", line 112, in <module>
main()
File "part-4/mayhem_2.py", line 105, in main
loop.run_forever()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
self._run_once()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 1739, in _run_once
event_list = self._selector.select(timeout)
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/selectors.py", line 558, in select
kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt
15:44:41,945 INFO: Published PubSubMessage(instance_name='cattle-sayc')
15:44:41,998 INFO: Published PubSubMessage(instance_name='cattle-3uco')
15:44:42,39 INFO: Published PubSubMessage(instance_name='cattle-y5oo')
15:44:42,76 INFO: Published PubSubMessage(instance_name='cattle-nhx1')
15:44:42,108 INFO: Consumed PubSubMessage(instance_name='cattle-v3bb')
15:44:42,238 INFO: Consumed PubSubMessage(instance_name='cattle-sayc')
15:44:42,293 INFO: Published PubSubMessage(instance_name='cattle-7zpa')
15:44:42,492 INFO: Consumed PubSubMessage(instance_name='cattle-3uco')
15:44:42,499 INFO: Published PubSubMessage(instance_name='cattle-sju0')
15:44:42,549 INFO: Published PubSubMessage(instance_name='cattle-v0vl')
15:44:42,593 INFO: Consumed PubSubMessage(instance_name='cattle-y5oo')
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
t.join()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1032, in join
self._wait_for_tstate_lock()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Sweet! I’m going to remove the five consumers & publishers for simplicity and just have one each.
Now onto that second issue: needing to send the SIGINT
signal (via CTRL-C
) twice.
Graceful Shutdowns with Threads and asyncio
#
Let’s first re-add our signal handler and our shutdown
coroutine function:
async def shutdown(loop, signal=None):
"""Cleanup tasks tied to the service's shutdown."""
if signal:
logging.info(f"Received exit signal {signal.name}...")
logging.info("Closing database connections")
logging.info("Nacking outstanding messages")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logging.info(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
logging.info(f"Flushing metrics")
loop.stop()
def main():
executor = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGQUIT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(loop, signal=s)))
q = queue.Queue()
try:
loop.create_task(publish(executor, q))
loop.create_task(consume(executor, q))
loop.run_forever()
finally:
loop.close()
logging.info("Successfully shutdown the Mayhem service.")
And try running as is:
$ python part-4/mayhem_3.py
12:30:15,718 INFO: Starting publisher
12:30:15,719 INFO: Published PubSubMessage(instance_name='cattle-ae2t')
12:30:15,719 INFO: Starting consumer
12:30:15,719 INFO: Consumed PubSubMessage(instance_name='cattle-ae2t')
12:30:15,757 INFO: Published PubSubMessage(instance_name='cattle-v88d')
12:30:15,757 INFO: Consumed PubSubMessage(instance_name='cattle-v88d')
12:30:16,184 INFO: Published PubSubMessage(instance_name='cattle-cb8y')
12:30:16,370 INFO: Published PubSubMessage(instance_name='cattle-62yg')
12:30:16,395 INFO: Published PubSubMessage(instance_name='cattle-0dvr')
^C12:30:16,634 INFO: Received exit signal SIGINT...
12:30:16,634 INFO: Closing database connections
12:30:16,634 INFO: Nacking outstanding messages
12:30:16,635 INFO: Cancelling 2 outstanding tasks
12:30:16,635 INFO: Flushing metrics
12:30:16,636 INFO: Successfully shutdown the Mayhem service.
12:30:16,642 INFO: Consumed PubSubMessage(instance_name='cattle-cb8y')
12:30:16,869 INFO: Consumed PubSubMessage(instance_name='cattle-62yg')
12:30:17,127 INFO: Published PubSubMessage(instance_name='cattle-i77v')
12:30:17,322 INFO: Published PubSubMessage(instance_name='cattle-clav')
12:30:17,568 INFO: Consumed PubSubMessage(instance_name='cattle-0dvr')
12:30:17,632 INFO: Published PubSubMessage(instance_name='cattle-5zdj')
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
t.join()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1032, in join
self._wait_for_tstate_lock()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Hm, well we were able to get rid of that first KeyboardInterrupt
traceback, but we still have that second one.
Looking at the docs for Executor
objects, there’s a shutdown
method that we can try. We’ll set wait
to False
since it will never be done executing because of the while True
loops in our synchronous functions.
We’ll pass in the executor to our shutdown
coroutine:
async def shutdown(loop, executor, signal=None):
"""Cleanup tasks tied to the service's shutdown."""
if signal:
logging.info(f"Received exit signal {signal.name}...")
logging.info("Closing database connections")
logging.info("Nacking outstanding messages")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logging.info(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
logging.info("Shutting down executor")
executor.shutdown(wait=False)
logging.info(f"Flushing metrics")
loop.stop()
def main():
executor = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGQUIT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(loop, executor, signal=s)))
Let’s try again:
$ python part-4/mayhem_4.py
12:41:50,850 INFO: Starting publisher
12:41:50,850 INFO: Published PubSubMessage(instance_name='cattle-80jm')
12:41:50,851 INFO: Starting consumer
12:41:50,851 INFO: Consumed PubSubMessage(instance_name='cattle-80jm')
12:41:50,922 INFO: Published PubSubMessage(instance_name='cattle-t0sa')
12:41:51,487 INFO: Consumed PubSubMessage(instance_name='cattle-t0sa')
12:41:51,680 INFO: Published PubSubMessage(instance_name='cattle-fd4u')
^C12:41:52,35 INFO: Received exit signal SIGINT...
12:41:52,35 INFO: Closing database connections
12:41:52,35 INFO: Nacking outstanding messages
12:41:52,35 INFO: Cancelling 2 outstanding tasks
12:41:52,35 INFO: Shutting down ThreadPoolExecutor
12:41:52,35 INFO: Flushing metrics
12:41:52,36 INFO: Successfully shutdown the Mayhem service.
12:41:52,385 INFO: Published PubSubMessage(instance_name='cattle-3xu0')
12:41:52,432 INFO: Consumed PubSubMessage(instance_name='cattle-fd4u')
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
t.join()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1032, in join
self._wait_for_tstate_lock()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
This took some digging for me to figure out. The traceback that we see is telling us that Python is waiting to acquire a lock during shutdown (triggered by the registered atexit
function) that we’re interrupting. Let’s try releasing the threads associated with the executor:
async def shutdown(loop, executor, signal=None):
"""Cleanup tasks tied to the service's shutdown."""
if signal:
logging.info(f"Received exit signal {signal.name}...")
logging.info("Closing database connections")
logging.info("Nacking outstanding messages")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logging.info(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
logging.info("Shutting down executor")
executor.shutdown(wait=False)
logging.info(f"Releasing {len(executor._threads)} threads from executor")
for thread in executor._threads:
try:
thread._tstate_lock.release()
except Exception:
pass
logging.info(f"Flushing metrics")
loop.stop()
And running this:
$ python part-4/mayhem_5.py
12:45:21,902 INFO: Starting publisher
12:45:21,903 INFO: Published PubSubMessage(instance_name='cattle-03ly')
12:45:21,903 INFO: Starting consumer
12:45:21,903 INFO: Consumed PubSubMessage(instance_name='cattle-03ly')
12:45:21,928 INFO: Published PubSubMessage(instance_name='cattle-y6zb')
12:45:22,419 INFO: Consumed PubSubMessage(instance_name='cattle-y6zb')
12:45:22,558 INFO: Published PubSubMessage(instance_name='cattle-406g')
^C12:45:22,852 INFO: Received exit signal SIGINT...
12:45:22,852 INFO: Closing database connections
12:45:22,852 INFO: Nacking outstanding messages
12:45:22,852 INFO: Cancelling 2 outstanding tasks
12:45:22,852 INFO: Shutting down executor
12:45:22,852 INFO: Releasing 2 threads from executor
12:45:22,852 INFO: Flushing metrics
12:45:22,853 INFO: Successfully shutdown the Mayhem service.
Ah ha! Much cleaner.
You might be thinking: that’s kind of a hacky approach. In fact, it is. Googling around a bit, I happened upon this CPython bug. I question parts of the first response, the main issue being if we removed the time.sleep
calls, we still need to interrupt the script twice. It also alludes to signals other than SIGTERM
and SIGINT
being supported, which is also not the case (while mixing up SIGTERM
& KeyboardInterrupt
:-!). So for now, we have to live with manually releasing the lock on the executor threads.
Calling asyncio
Code from Threads
#
We setup how to run synchronous code with asyncio
via threads, and how to clean them up upon shutdown. Now let’s hook in the rest of our asynchronous code, like restart_host
, save
, etc.
First, let’s add back in our global exception handler. Recall our handle_exception
function:
def handle_exception(loop, context):
msg = context.get("exception", context["message"])
logging.error(f"Caught exception: {msg}")
logging.info("Shutting down...")
asyncio.create_task(shutdown(loop))
We need to update handle_exception
because shutdown
now takes in another argument, our executor:
def handle_exception(executor, loop, context):
# context["message"] will always be there; but context["exception"] may not
msg = context.get("exception", context["message"])
logging.error(f"Caught exception: {msg}")
logging.info("Shutting down...")
asyncio.create_task(shutdown(loop, executor))
Now let’s re-add our global exception handler to our loop. Since it now takes in executor
, we need to make it a partial function before attaching it to our loop:
# <-- snip -->
import functools
# <-- snip -->
def main():
executor = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGQUIT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(loop, executor, signal=s)))
handle_exc_func = functools.partial(handle_exception, executor)
loop.set_exception_handler(handle_exc_func)
Now let’s add back in all our coroutine functions from earlier:
async def restart_host(msg):
# <-- snip -->
async def save(msg):
# <-- snip -->
async def cleanup(msg, event):
# <-- snip -->
async def extend(msg, event):
# <-- snip -->
def handle_results(results, msg):
# <-- snip -->
async def handle_message(msg):
# <-- snip -->
And update our consume_sync
function to call handle_message
via the familiar asyncio.create_task
function:
def consume_sync(queue):
while True:
msg = queue.get()
logging.info(f"Consumed {msg}")
asyncio.create_task(handle_message(msg))
Running it, we see we don’t get very far:
$ python part-4/mayhem_6.py
13:26:44,639 INFO: Starting publisher
13:26:44,639 INFO: Published PubSubMessage(instance_name='cattle-njjk')
13:26:44,639 INFO: Starting consumer
13:26:44,639 INFO: Consumed PubSubMessage(instance_name='cattle-njjk')
13:26:44,640 ERROR: Caught exception: no running event loop
13:26:44,640 INFO: Shutting down...
/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py:1776: RuntimeWarning: coroutine 'handle_message' was never awaited
handle = None # Needed to break cycles when an exception occurs.
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
13:26:44,643 INFO: Closing database connections
13:26:44,643 INFO: Nacking outstanding messages
13:26:44,643 INFO: Cancelling 1 outstanding tasks
13:26:44,644 INFO: Shutting down ThreadPoolExecutor
13:26:44,644 INFO: Releasing 2 threads from executor
13:26:44,644 INFO: Flushing metrics
13:26:44,644 INFO: Successfully shutdown the Mayhem service.
Notice the Caught exception: no running event loop
error log line.
This makes sense: at this point, we’re in another thread and there is no loop running for that thread, only in the main thread.
Let’s try giving our coroutine function the loop explicitly, and calling create_task
on the loop:
def consume_sync(queue, loop):
while True:
msg = queue.get()
logging.info(f"Consumed {msg}")
loop.create_task(handle_message(msg))
async def consume(executor, queue):
logging.info("Starting consumer")
loop = asyncio.get_running_loop()
asyncio.ensure_future(loop.run_in_executor(executor, consume_sync, queue, loop))
And trying again:
$ python part-4/mayhem_7.py
16:29:26,173 DEBUG: Using selector: KqueueSelector
16:29:26,174 INFO: Starting publisher
16:29:26,175 INFO: Starting consumer
16:29:26,175 INFO: Published PubSubMessage(instance_name='cattle-fsia')
16:29:26,175 INFO: Consumed PubSubMessage(instance_name='cattle-fsia')
16:29:26,447 INFO: Published PubSubMessage(instance_name='cattle-jcmv')
16:29:26,447 INFO: Consumed PubSubMessage(instance_name='cattle-jcmv')
16:29:27,326 INFO: Published PubSubMessage(instance_name='cattle-xqvq')
16:29:27,327 INFO: Consumed PubSubMessage(instance_name='cattle-xqvq')
^C16:29:27,870 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-fsia')
16:29:27,870 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-jcmv')
16:29:27,871 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-xqvq')
16:29:27,871 INFO: Received exit signal SIGINT...
16:29:27,871 INFO: Closing database connections
16:29:27,871 INFO: Nacking outstanding messages
16:29:27,871 INFO: Cancelling 15 outstanding tasks
16:29:27,871 INFO: Shutting down ThreadPoolExecutor
16:29:27,871 INFO: Releasing 2 threads from executor
16:29:27,872 INFO: Flushing metrics
16:29:27,872 INFO: Successfully shutdown the Mayhem service.
Now we see that we only publish & consume messages, and that only after we interrupt the process do we get to "Extended deadline by 3 seconds for ..."
, with no save or restart log lines.
asyncio.ensure_future
itself also has a supported keyword arg to explicitly give it a loop to schedule the future on:
async def consume(executor, queue):
logging.info("Starting consumer")
loop = asyncio.get_running_loop()
asyncio.ensure_future(
loop.run_in_executor(executor, consume_sync, queue, loop), loop=loop
)
Ya-ha! It works! Oh so it looks…
We’re not threadsafe.
It can be difficult to tell when you’re not being threadsafe. Particularly when it looks like it works like it does above. But later on in the debugging section, I’ll show how we can easily surface when there is an issue of threadsafety.
asyncio
has a couple of threadsafe functions to use when scheduling a coroutine on the main thread from another thread. In particular, we’ll use the asyncio.run_coroutine_threadsafe
:
def consume_sync(queue, loop):
while True:
msg = queue.get()
logging.info(f"Consumed {msg}")
asyncio.run_coroutine_threadsafe(handle_message(msg), loop)
Basically, when working with threads and asyncio
, use asyncio
’s _threadsafe
APIs.
Recap #
It’s pretty simple to get around synchronous code using a ThreadPoolExecutor
and loop.run_in_executor
. However, one can easily get tripped up when needing to use threads with asyncio
. With that, there are a few _threadsafe
APIs within the asyncio
library that it’s good to get familiar with.