Wednesday, April 23, 2014

Celery Multiple Queues Setup

Here is an issue I had to handle lately. I have kind of a chat in this app I am developing.

General outline:
you post a message, it's sent to the server, where it's saved, and is sent to pubsub server (running on tornado) to push to all subscribed clients. Sending the message to pubsub server is done by a task, which means celery.

Until recently, it was ok for us to have one celery worker, running with the beat. But as time was moving on, one of the tasks became quite big, due to amount of data it has to handle, and takes now about 6 minutes, making other, more time dependant tasks, look like they are not running. (although, they do, but just too late to be of any real value).
It became quite obvious to me, i need to split the queue into 2: one queue for fast tasks and one queue for slow tasks. But how do i do it?? I went to celery docs and read them through and through, until my eyes hurt, the best i got was 2 queues running all the tasks at the same time, no routing whatsoever. I called a few people, poked the folks in #celery IRC channel, visited #django IRC channel and ,of course, went for help to stackoverflow community. And all i got, at best, was silence, at worst bad, unhelpful suggestions (some people talk just to hear themselves). So, seeing as no help was coming from anywhere, i decided to go back to basics, read some more celery docs, and start from scratch.
In many cases, such as this, i find it a good practice, if i can't get anywhere with what i have, i start over, from the beginning, in 9 out 10 cases, it helps.

I will not go over everything i've tried, so here is my full solution so far, works beautifully.

System tech details:
Django - 1.6.2
Celery - 3.1.10
RabbitMQ - 3.3.0
Python - 2.7.5

Settings

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('long', Exchange('long'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
    'proj.app.tasks.LongTask': {
        'queue': 'long',
        'routing_key': 'long_tasks',
    },
   'proj.app2.tasks.another_long_task': {
        'queue': 'long',
        'routing_key': 'long_tasks'
   }
}

Tasks

Let's take a look at some example tasks: 2 that need to be routed to 'long_tasks' queue and 1 that will go to 'default' queue.


import celery


class LongTask(celery.Task):
    def run(self, *args, **kwargs):
        # do some cool stuff over here


@celery.task
def another_long_task():
    # do some other cool stuff over here, for a very long time

@celery.task
def fast_task():
    print 'i am real fast'



Let's say, 'LongTask' is run by the beat every 3 minutes, 'another_long_task' is run when user click 'run another_long_task' (via another_long_task.delay()), and 'fast_task' is also run by the beat every 2 minutes.


Celery Beat Schedule

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {

    'long_task': {
        'task': 'proj.app.tasks.LongTask',
        'schedule': crontab(minute='*/3')
    },

    'fast_task': {
        'task': 'proj.app3.tasks.fast_task',
        'schedule': crontab(minute='*/2')
    },
}

As you can see nothing is different in how i defined those 2 tasks, long and fast. Routing is worked out according to what you specify in CELERY_ROUTES. Nothing in my tasks definition says to what queue they should go, as well. Generally, it's a good policy to centralize this kind of settings. This way if you will need to change one or more, you know where to look, no need to jump from place to place. Also note that that task will be routed to the first route that will feat the name of the task. So, if say we have same task defined twice, one after another, with different queues, the first one that will be read, will win the race.

How to run celery

Open the terminal/bash, go to your project's virtual env, activate it. Do that 2 more times. You should have 3 terminal tabs/windows open.

terminal 1
$ celery -A proj worker -Q default -l debug -n default_worker

terminal 2
$ celery -A proj worker -Q long -l debug -n long_worker

If everything went ok, you should see in both terminal that they acknowledge each other (default/long_worker has joined the party message) and sync, each one should also print all the tasks it sees as registered.

terminal 3
$ celery -A proj beat -l debug

This will start the beat. Now just sit back and watch how your tasks are being routed to appropriate queue.

Running same setup on Heroku

Here is a Procfile that i used to run this setup on Heroku. 
Unlike a linux server, or any other server with sudo access, where you would probably want to setup your supervisor to run the beat and celery as daemons, on Heroku, this setup mean 3 dynos ($$).

web:  run-program gunicorn arena.wsgi

celery_beat: run-program celery -A arena beat -l info

celery1: run-program celery -A arena worker -Q default -l info --purge -n default_worker

celery2: run-program celery -A arena worker -Q feeds -l info --purge -n feeds_worker



Hope this will save you hours and energy i wasted to figure this all out.

And for those who find it hard to read the code in this post, here is a link to a gist.
And here is my question on stackoverflow.