Example code can be found on GitHub. All code on this post is licensed under MIT.
Mayhem Mandrill Recap #
The goal for this 8-part series is to build a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.
'foo %s' % 'bar'
) is lazily-evaluated. But I just love f-strings.
At the end of part 1, our service looked like this:
import asyncio
import logging
import random
import string
import uuid
import attr
# NB: Using f-strings with log messages may not be ideal since no matter
# what the log level is set at, f-strings will always be evaluated
# whereas the old form ("foo %s" % "bar") is lazily-evaluated.
# But I just love f-strings.
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
@attr.s
class PubSubMessage:
instance_name = attr.ib()
message_id = attr.ib(repr=False)
hostname = attr.ib(repr=False, init=False)
restarted = attr.ib(repr=False, default=False)
saved = attr.ib(repr=False, default=False)
acked = attr.ib(repr=False, default=False)
extended_cnt = attr.ib(repr=False, default=0)
def __attrs_post_init__(self):
self.hostname = f"{self.instance_name}.example.net"
async def publish(queue):
"""Simulates an external publisher of messages.
Args:
queue (asyncio.Queue): Queue to publish messages to.
"""
choices = string.ascii_lowercase + string.digits
while True:
msg_id = str(uuid.uuid4())
host_id = "".join(random.choices(choices, k=4))
instance_name = f"cattle-{host_id}"
msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
# publish an item
asyncio.create_task(queue.put(msg))
logging.debug(f"Published message {msg}")
# simulate randomness of publishing messages
await asyncio.sleep(random.random())
async def restart_host(msg):
"""Restart a given host.
Args:
msg (PubSubMessage): consumed event message for a particular
host to be restarted.
"""
# unhelpful simulation of i/o work
await asyncio.sleep(random.random())
msg.restart = True
logging.info(f"Restarted {msg.hostname}")
async def save(msg):
"""Save message to a database.
Args:
msg (PubSubMessage): consumed event message to be saved.
"""
# unhelpful simulation of i/o work
await asyncio.sleep(random.random())
msg.save = True
logging.info(f"Saved {msg} into database")
async def cleanup(msg, event):
"""Cleanup tasks related to completing work on a message.
Args:
msg (PubSubMessage): consumed event message that is done being
processed.
"""
# this will block the rest of the coro until `event.set` is called
await event.wait()
# unhelpful simulation of i/o work
await asyncio.sleep(random.random())
msg.acked = True
logging.info(f"Done. Acked {msg}")
async def extend(msg, event):
"""Periodically extend the message acknowledgement deadline.
Args:
msg (PubSubMessage): consumed event message to extend.
event (asyncio.Event): event to watch for message extention or
cleaning up.
"""
while not event.is_set():
msg.extended_cnt += 1
logging.info(f"Extended deadline by 3 seconds for {msg}")
# want to sleep for less than the deadline amount
await asyncio.sleep(2)
async def handle_message(msg):
"""Kick off tasks for a given message.
Args:
msg (PubSubMessage): consumed message to process.
"""
event = asyncio.Event()
asyncio.create_task(extend(msg, event))
asyncio.create_task(cleanup(msg, event))
await asyncio.gather(save(msg), restart_host(msg))
event.set()
async def consume(queue):
"""Consumer client to simulate subscribing to a publisher.
Args:
queue (asyncio.Queue): Queue from which to consume messages.
"""
while True:
msg = await queue.get()
logging.info(f"Consumed {msg}")
asyncio.create_task(handle_message(msg))
def main():
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt:
logging.info("Process interrupted")
finally:
loop.close()
logging.info("Successfully shutdown the Mayhem service.")
if __name__ == "__main__":
main()
Graceful Shutdown #
Often, you’ll want your service to gracefully shutdown if it receives a POSIX signal of some sort, e.g. clean up open database connections, stop consuming messages, finish responding to current requests while not accepting new requests, etc. So, if we happen to restart an instance of our own service, we should clean up the “mess” we’ve made before exiting out.
We’ve been catching the commonly-known KeyboardInterrupt
exception like many other tutorials and libraries. But there are many common signals that a service should expect and handled. A few typical ones are (descriptions from man signal
):
SIGHUP
- Hangup detected on controlling terminal or death of controlling processSIGQUIT
- Quit from keyboard (via^\
)SIGTERM
- Termination signalSIGINT
- Interrupt program
There’s also SIGKILL
(i.e. the familiar kill -9
) and SIGSTOP
, although the standard is that they can’t be caught, blocked, or ignored.
Currently, if we quit our service via ^\
or send a signal via something like pkill -TERM -f <script path>
, our service doesn’t get a chance to clean up:
$ python part-1/mayhem_10.py
19:08:25,553 INFO: Consumed PubSubMessage(instance_name='cattle-npww')
19:08:25,554 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-npww')
19:08:25,655 INFO: Consumed PubSubMessage(instance_name='cattle-rm7n')
19:08:25,655 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-rm7n')
19:08:25,790 INFO: Saved PubSubMessage(instance_name='cattle-rm7n') into database
19:08:25,831 INFO: Saved PubSubMessage(instance_name='cattle-npww') into database
[1] 78851 terminated python part-1/mayhem_10.py
We see that we don’t reach the finally
clause.
Using a Signal Handler #
It should also be pointed out that – even if we were to only ever expect a KeyboardInterrupt
/ SIGINT
signal – it could happen outside the catching of the exception, potentially causing the service to end up in an incomplete or otherwise unknown state:
def main():
queue = asyncio.Queue()
loop = asyncio.get_event_loop() # <-- could happen here or earlier
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt:
logging.info("Process interrupted") # <-- could happen here
finally:
loop.close() # <-- could happen here
logging.info("Successfully shutdown the Mayhem service.") # <-- could happen here
So, instead of catching KeyboardInterrupt
, let’s attach some signal handlers to the loop.
First, we should define the shutdown behavior we want when a signal is caught:
async def shutdown(signal, loop):
"""Cleanup tasks tied to the service's shutdown."""
logging.info(f"Received exit signal {signal.name}...")
logging.info("Closing database connections")
logging.info("Nacking outstanding messages")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logging.info(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.gather(*tasks)
logging.info(f"Flushing metrics")
loop.stop()
Here I’m just closing that simulated database connections, returning messages to pub/sub as not acknowledged (so they can be redelivered and not dropped), and finally cancelling the tasks. We don’t necessarily need to cancel pending tasks; we could just collect and allow them to finish. We may also want to take this opportunity to flush any collected metrics so they’re not lost.
Let’s hook this up to the main event loop now. We can also remove the KeyboardInterrupt
catch since that’s now taken care of with adding signal.SIGINT
as a handled signal.
def main():
loop = asyncio.get_event_loop()
# May want to catch other signals too
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
queue = asyncio.Queue()
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
finally:
loop.close()
logging.info("Successfully shutdown the Mayhem service.")
Side note: You might have noticed that within the lambda
closure, I binded the s
immediately. This is because without that, we end up running into an apparently common gotcha in Python-land: late bindings.
So now when I run the script, and in another terminal, run pkill -TERM -f "python part-2/mayhem_1.py"
, (or -HUP
or -INT
), we see the following:
$ python part-2/mayhem_1.py
# <--snip-->
16:22:44,160 INFO: Saved PubSubMessage(instance_name='cattle-hl5s') into database
16:22:44,258 INFO: Consumed PubSubMessage(instance_name='cattle-0a7a')
16:22:44,259 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-0a7a')
16:22:44,351 INFO: Restarted cattle-hl5s.example.net
16:22:44,369 INFO: Restarted cattle-0a7a.example.net
16:22:44,418 INFO: Saved PubSubMessage(instance_name='cattle-0a7a') into database
16:22:44,455 INFO: Restarted cattle-gd1y.example.net
16:22:44,524 INFO: Received exit signal SIGTERM...
16:22:44,524 INFO: Closing database connections
16:22:44,524 INFO: Nacking outstanding messages
16:22:44,525 INFO: Cancelling 14 outstanding tasks
Hmm… we kind of just hang. We have to run pkill -TERM -f "python part-2/mayhem_11.py"
again to stop it:
$ python part-2/mayhem_1.py
# <--snip-->
16:22:44,160 INFO: Saved PubSubMessage(instance_name='cattle-hl5s') into database
16:22:44,258 INFO: Consumed PubSubMessage(instance_name='cattle-0a7a')
16:22:44,259 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-0a7a')
16:22:44,351 INFO: Restarted cattle-hl5s.example.net
16:22:44,369 INFO: Restarted cattle-0a7a.example.net
16:22:44,418 INFO: Saved PubSubMessage(instance_name='cattle-0a7a') into database
16:22:44,455 INFO: Restarted cattle-gd1y.example.net
16:22:44,524 INFO: Received exit signal SIGTERM...
16:22:44,524 INFO: Closing database connections
16:22:44,524 INFO: Nacking outstanding messages
16:22:44,525 INFO: Cancelling 14 outstanding tasks
16:22:47,776 INFO: Received exit signal SIGTERM... # <-- second signal needed
16:22:47,776 INFO: Closing database connections
16:22:47,777 INFO: Nacking outstanding messages
16:22:47,777 INFO: Cancelling 0 outstanding tasks
16:22:47,777 INFO: Flushing metrics
16:22:47,778 INFO: Successfully shutdown the Mayhem service.
This is happening because of the await asyncio.gather(*tasks)
line in our shutdown
coroutine function. We are cancelling tasks, which raises an asyncio.CancelledError
exception. By default, the asyncio.gather
function does not return exceptions. Because of that, the service gets in a weird state: it will raise the first exception, but will continue working on the other awaitables that were also passed into gather
.
Essentially, if we’re not careful, exceptions get swallowed for us at this point. We could wrap our await asyncio.gather(*tasks)
in a try
/except
clause, but we don’t really need to care at this point if cancelling a task raises anything.
Sidebar: In production, I would probably care if cancelling a task raises anything other than an asyncio.CancelledError
exception. But I don’t want to cover too complicated things right now; and we’ll get into more specific exception handling later on in Exception Handling.
So instead, let’s pass in return_exceptions=True
to asyncio.gather
:
async def shutdown(signal, loop):
"""Cleanup tasks tied to the service's shutdown."""
logging.info(f"Received exit signal {signal.name}...")
logging.info("Closing database connections")
logging.info("Nacking outstanding messages")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logging.info(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
logging.info(f"Flushing metrics")
loop.stop()
And rerun again (with running pkill -TERM -f "python part-2/mayhem_2.py"
in another terminal), we see the following:
$ python part-2/mayhem_2.py
# <--snip-->
16:37:50,289 INFO: Consumed PubSubMessage(instance_name='cattle-7fmg')
16:37:50,290 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-7fmg')
16:37:50,514 INFO: Saved PubSubMessage(instance_name='cattle-7fmg') into database
16:37:50,634 INFO: Consumed PubSubMessage(instance_name='cattle-4chp')
16:37:50,634 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-4chp')
16:37:50,684 INFO: Restarted cattle-eovi.example.net
16:37:50,785 INFO: Restarted cattle-4chp.example.net
16:37:50,870 INFO: Received exit signal SIGTERM...
16:37:50,870 INFO: Closing database connections
16:37:50,870 INFO: Nacking outstanding messages
16:37:50,870 INFO: Cancelling 13 outstanding tasks
16:37:50,871 INFO: Flushing metrics
16:37:50,872 INFO: Successfully shutdown the Mayhem service.
No need to send a signal more than once, now.
Which Signals to Care About #
You might be asking which signals should you care about. And apparently, there is no standard.
Hard Exit | Graceful | Reload/Restart | |
---|---|---|---|
nginx | TERM , INT | QUIT | HUP |
Apache | TERM | WINCH | HUP |
uWSGI | INT , QUIT | HUP , TERM | |
Gunicorn | INT , QUIT | TERM | HUP |
Docker | KILL | TERM |
With Docker, for further understanding of how to properly handle signals, I highly recommend taking a read of Why Your Dockerized Application Isn’t Receiving Signals.
Heads Up: asyncio.shield
Isn’t Graceful
#
As I discovered when reading about this handy little library, aiorun
, another misleading API is asyncio.shield
. The docs say it’s a means to shield a future from cancellation. But if you have a coroutine that must not be cancelled during shutdown, asyncio.shield
will not help you.
This is because the task that asyncio.shield
creates gets included in asyncio.all_tasks
, and therefore receives the cancellation signal like the rest of them.
To help illustrate, here’s a simple async function with a long sleep that we want to shield from cancellation:
import asyncio
import logging
import signal
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
async def cant_stop_me():
logging.info("Can't stop me...")
for i in range(12):
logging.info("Sleeping for 5 seconds...")
await asyncio.sleep(5)
logging.info("Done!")
async def parent_task():
# shielding should prevent `cant_stop_me` from cancellation if
# `parent_task` gets cancelled (i.e. by `shutdown`)
logging.info("Kicking of shielded task")
await asyncio.shield(cant_stop_me())
logging.info("Shielded task done")
async def main():
asyncio.create_task(parent_task())
await asyncio.sleep(60)
async def shutdown(signal, loop):
logging.info(f"Received exit signal {signal.name}...")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
for task in tasks:
# skipping over shielded coro still does not help
if task._coro.__name__ == "cant_stop_me":
continue
task.cancel()
logging.info("Cancelling outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
logging.info("Stopping loop")
loop.stop()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
try:
loop.run_until_complete(main())
finally:
logging.info("Successfully shutdown service")
loop.close()
When running, the coroutine is immediately cancelled. And while we see logs related to the shutdown
coroutine function and clean up, we also see cant_stop_me
does get cancelled and the script errors out, when we should continue to see "Sleeping for 5 seconds..."
log lines:
$ python part-2/mayhem_aside_1.py
python mayhem_aside_1.py
13:19:47,517 INFO: Kicking of shielded task
13:19:47,517 INFO: Can't stop me...
13:19:47,517 INFO: Sleeping for 5 seconds...
^C13:19:48,451 INFO: Received exit signal SIGINT...
13:19:48,452 INFO: Cancelling outstanding tasks
13:19:48,452 INFO: Successfully shutdown service
Traceback (most recent call last):
File "mayhem_aside_1.py", line 75, in <module>
loop.run_until_complete(main())
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
concurrent.futures._base.CancelledError
13:19:48,458 ERROR: Task was destroyed but it is pending!
task: <Task pending coro=<cant_stop_me() done, defined at mayhem_aside_1.py:29> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1048c3c48>()]> cb=[shield.<locals>._done_callback() at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/tasks.py:776, gather.<locals>._done_callback() at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/tasks.py:664]>
13:19:48,458 ERROR: Task was destroyed but it is pending!
task: <Task pending coro=<shutdown() done, defined at mayhem_aside_1.py:50> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x1048c3d98>()]>>
Recap #
Unfortunately, we don’t have any nurseries in asyncio
core to clean ourselves up; it’s up to us to be responsible and close up the connections and files we opened, respond to outstanding requests, basically leave things how we found them.
Doing our cleanup in a finally
clause isn’t enough, though, since a signal could be sent outside of the try
/except
clause.
So as we construct the loop, we should tell how it should be deconstructed as soon as possible in the program. This ensures that “all our bases are covered”, that we’re not leaving artifacts anywhere.
And finally, we also need to be aware of when our program should shutdown, which is closely tied to how we run our program. If it’s a manually-ran script, then SIGINT
is fine. But if it’s within a daemonized Docker container, then SIGTERM
is more appropriate.