Routing Celery task for simple prioritization

  |   Source


Like most businesses, where I work we need to send lots of notifications to our users, mainly emails and push notifications; the setup is quite simple:

  • a service accepts requests to send notifications to users

  • the notification service prepare the message and put it in a queue

  • a pool of workers fetches messages from the queue and perform the actual delivery

This works reasonably well and we can scale the service increasing the instances of the notification service and the delivery workers.

This setup is used also when a user requests an export of her/his historical data; since this process can take a while, a background job fetches the data, generates a pdf and sends it via email using the notifications service. At the same time we generate hundreds of thousands of notifications, usually in the morning, and this fill up the notifiation queue so if a user requests an export during this time frame its email will have to wait a lot before it can be processed.


We have evaluated a couple of solutions:
  • per message priority

  • dedicated low priority queue for high volume automaticly generated notifications using task routing

The first solution is a generic one but as far as we have seen it is not easy to have the guarantie that a high priority message will be delivered in our desired time frame and we opted for the second solution because we don't need a fine grained prioritization system (maybe in the future) but just a way to continue to deliver user generated notifications when we are sending our automated high volume notifications during the morning.


Our stack is based on Celery and it is composed mainly by two parts:
  • the notifications service thats send messages to a queue

  • a pool of workers that fetch messages from the queue and deliver the notifications

To achieve our goal we only had to change the way that notifications service sends messages to the queue by specifing the low or default priority queue based on the message type and by running a specific pool of workers bound to each priority queue.

Example code with routing:

from celery import Celery

celery = Celery()
celery.main = 'Test app'

def task(prio, message):
    print(f'{prio}: {message}')

# calling the task specifying the queue
task.apply_async(args=('default', next(generator)),
How to run a worker specifing the queue::

$ celery -A tasks worker --loglevel=INFO -Q default

This solution works but there is an efficency problem since when the low priority queue will be empty the low priority workers will be idle wasting precious (and paid) resources; fortunately there is a simple solution for this because it is possible to specify more than one queue using the -Q parameter. This way we will have a dedicated pool of workers that will work on messages generated by user activity and a second pool of workers that will handle the automated messages and, when those will be finished, these workers can help with default priority messages.

An example implementation is provided in this repo with instruction to run the different use cases.

P.S. We are hiring!

Comments powered by Disqus