Going Asynchronous with Redis and Celery for Django

Going Asynchronous: Redis and Celery for Django - From baby to Hero

By Marcelo Fernandes Nov 11, 2017


What to Expect from this Post

  • 1. Learn how to use Redis on an ordinary python script
  • 2. Learn how to use Celery on an ordinary python script
  • 3. Integrating Redis & Celery on your Django application
  • 4. Monitoring your Celery Tasks.

In resume, today we will be talking about how to build up a Django application that uses redis and celery effectively. We will come from the very basic, using Redis and Celery on python bare scripts, and then we will migrate them to django and grow into something that is very close to production. If you are very new to these subjects, you can expect to learn a lot here, and if you already have some experience, you can advance to the last sections of this post, and then you can remember some stuff or learn new ones, who knows? So let's go!

Introduction - Redis and Celery

When we are talking about WebApps that have particular demands, we always step into celery and redis. Let's imagine a situation: You have a very high demanding feature in your App, and this feature requires some computation time. So, The first idea that pops up in your mind is: "How would I compute such high demanding task when I'm using Django, without being slow or blocking the client request?"

Well, there are ways to optimize your django apps and increase your performance, I had wrote an entirely post on that before (you may check it out by clicking here) . But some times there is no easy and practical way to calculate such high demand feature in a fashion way, without blocking the worker that carries the user instance that navigates your application. So, it would be nice to have alternatives to do it... And this is where Redis and Celery comes in.




Celery: Celery is a Distributed Task Queue, it is responsible for taking "tasks" from a Queue and executing it on a worker server. In real life, it means that "someone" (A python script), will be talking to a query and saying: "Hey, I will drop this task on the queue, so celery can pick it up and solve it for me while I do something else", which is exactly what we need.

Whenever a user makes a big request what's happening behind the scenes is: Your client request an url of your application, your django view will process this request, once this request is heavy, you will throw a task on the queue and now it's someone else problem to solve (celery), and then your python code take back control and is able to send a response back to the client. This is good, in situations where you have 5 workers and 10000 users making requests, you don't wanna have 5 users blocking the other 9995, right?




Redis: This guy is both a in-memory cache system, and a message broker. So what are these? A in-memory cache system, is an ordinary cache, that saves the data on the system memory, fair enough? So it's pretty optimized for performance, whenever you wan't to retrieve data that you cached before, in-memory cache systems are the more performatic ways to do it. And what about message brokers? Well, from the description of celery before, you must have been thinking: "I know what is python, and now I know about celery, but how will I implement this queue-messaging stuff?". That's where a message broker comes in, this is the piece of the puzzle that will delivery the tasks. It will continue listen to a queue (in our case it comes with the broker itself), and will be delegating tasks to our celery workers as they come.

Redis gives us the best of two worlds. We can use it as a cache system, saving the stuff that we want to retrieve later, and it can act as a delivery of tasks to celery. We will be diving in detail about those later, but you can start to fantasize what we will be talking about.



Redis Installation

So first of all, lets get an overview of redis, for this matter, the first thing you should do is get redis. The installation process is very easy, and is documented on https://redis.io/download, you might also install it using apt-get install redis (for linux users) or usign brew install redis (for macOs users)


# install the python library for redis
sudo pip install redis

Once you have the python library for redis, you have to run the redis server



redis-server

Observation: if you downloaded from the site, you have to run this command
inside the src folder.

Now you should see an image of your redis server running, that looks like:



Once you get that, lets start the fun.

Working with Redis on a Python Script

Redis-py has a pretty easy interface for redis, normally all you want to know is how to set up some data on the cache, retrieve it or delete it. Those cases will probably cover your daily usages, but be aware that there are much more methods that you can use, if you wanna go deeper, take a look at the methods available on the redis-py documentation anyway, here is a quick example:



import redis

# Opens a Redis Connection pool on localhost.
# Port 6379 is the DEFAULT PORT.
r = redis.Redis(host='127.0.0.1', port=6379)


# ex stands for Expiration time
r.set(name='car:color', value='blue', ex=900)
r.set(name='car:price', value=100, ex=900)

color = r.get('car:color')
price = r.get('car:price')

print(color, price)
# blue, 100

# Number of seconds to expire.
print(r.ttl('car:color'))
# 900

# Remove the timeout
r.persist('car:color')

print(r.ttl('car:color'))
# None


r.delete('car:color')
print(r.get('car:color'))
# None

r.exists('car:color')
# False

In this example we opened a connection to redis, so we can pull data in and out, but sometimes we want to insert a bunch of data, and following this example might not be ideal, since for each insertion you do, you are hitting the cache all over again. A more optimized way to do that is via pipeline. With a pipeline you can populate a bunch of data on the cache in a single query (all in once) and they will be thread safe once the transaction is atomic by default. Check it out:



r.set('bing', 'bang')
r.set('ping', 'pong')
r.set('buzz', 'fizz')

pipe = r.pipeline()
# pipe = r.pipeline(transaction=False) <- if you don't want atomic transactions

pipe.set('foo', 'bar')
pipe.get('bing')
pipe.get('ping')

pipe.execute()
# [True, b'bang', b'pong']

In the majority of all cases, one just have to know how to set, get, and check for the timeout for a certain key on the redis server, but be aware that there are so much more keys and ways that you can use the redis-py library. If you are moving to a super advanced usage I would recommend you to dive into the documentation/source code, there is a plenty of interesting stuff there.

Celery Installation

Alright, time to check out what celery is capable of. Installing celery is so easy, all you need is a pip install:


pip install Celery

Working with Celery on a Python Script

First let's go with our baby steps to understand what is going on. The first thing you will need, is to configure your celery app with a single and easy task

my_task.py

from celery import Celery

app = Celery('my_task', broker='redis://localhost:6379/0')

@app.task
def pow(x, y):
    return x**y

Take a look how the first argument to Celery is the name of the file its running on. According to the docs, this happens in order to generate names automatically when tasks are defined in the __main__ module.

In this example we also created a task, that returns x to the power of y. This is a very simple example, and our task is in the same file as our Celery app. It will change in the next steps, but let's see what we can come up with.

First, we will run celery, for that matter we will create a worker, by executing our "my_task.py" script with the "worker" argument:


celery -A my_task worker --loglevel=info
# -A stands for app (Celery app) -> points to our my_task.py
# worker starts a worker instance
# loglevel can be: DEBUG, INFO, WARNING, ERROR, CRITICAL, or FATAL.

Now your machine has a worker that is dedicated entirely to run this task that we created on our python script. So let's run it:


We opened two terminals, in the one on your right, we ran our celery worker, and in the one on your left, we imported our function and ran our task through the delay method, you can see on the right that celery grabbed our task and returned to the python code an AsyncResult object. This object can be further used in different forms: To wait for the task until its finished, to check the status of the task, or to get its returned value.

Working with Celery and Redis

Now we are going to merge our tools. The goal here is to save the result of our pow() task on redis, and then retrieve the result or it status from our python code.

First of all we need to set our Celery app to run its back-end with redis, all we need to do is change our my_task.py:


app = Celery('my_task',
             backend='redis://localhost',
             broker='redis://localhost:6379/0')

Just add the parameter "backend" which now is gonna be our redis, this is the place where our data will be stored.

Now if we run again:


from my_task import pow

result = pow.delay(4,2)

result.ready()
# False

result.get(timeout=1)
# 16

Now we can check the status and retrieve results. You probably won't be interested in using the get() method, since it will turn your asynchronous call into a synchronous one.

So, in resume what we did here was: We created a Celery app that connects to a Redis server; Once someone ran our task and asked to for pow(), the worker in our computer (that runs independently of the script that made the call) processes the result and saves on the Redis cache. We can retrieve the results from the call that we did (most often we won't), and check the status of it.

Therefore, now you can run some asynchronous tasks on your python code. For a more advanced example, check out a snippet that I created to fetch urls using celery

Starting our Django App

Django takes care of the hard setup configuration that you would have in order to set a proper environment system for redis and celery. It will take off some weight out of your shoulder and let you worry with the development of your application itself. In order to work it out, we are going to install some very nice libraries:


# I hope you already have Django, but if you don't:
pip install django
pip install django-celery
pip install django-redis

First of all, let's create our Django project


django-admin startproject django_project

Now let's do some modification on your settings.py script

First lets add our redis configuration:

Add this variable to your settings.py

CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379/1",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
        }
    }
}

That's all we are going to need for this example, now let's configure Celery.

This is all you are going to need, if you have any problem with the intagration I'd recommend you to spare some time to look at the Celery project documentation on how to set it up with Django. So let's keep going...

Your django app shoud look like that:


django_project/
        django_project/
            __init__.py
            settings.py
            urls.py
            wsgi.py

Now we are going to create a file that is going to help you set up Celery whenever your application starts. It should be inside your configuration folder, in this case it's going to be called django_project/django_project/celery.py


django_project/django_project/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_project.settings')

app = Celery('django_project',
             backend='redis://localhost',
             broker='redis://localhost:6379/0')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

Just take care of mapping the variable names to your own project name, in this situation just change the strings where we put "django_project" to whatever your project name is.

Now we have to make sure that this script is loaded when our django app runs, so inside of your proj/proj/__init__.py , put the following code:



from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

Now you can run your worker:


celery -A django_project worker -l info

Creating a View that Runs an Asynchronous Task

Now we know the basics about celery and redis, therefore lets make a sandbox to play around with the concepts that we have learnt. In order to do that, we are going to create a Django App.


python manage.py startapp celery_sandbox

Now our project files:


django_project/django_project/settings.py

INSTALLED_APPS = [
    # ...
    'celery_sandbox',
]


django_project/django_project/urls.py

from django.conf.urls import url, include
from django.contrib import admin

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^celery-sandbox/', include('celery_sandbox.urls'))
]


django_project/celery_sandbox/urls.py

from django.conf.urls import url
from celery_sandbox.views import my_request, my_response

urlpatterns = [
    url(r'^request$', my_request, name='celery-request'),
    url(r'^response', my_response, name='celery-response')
]


django_project/celery_sandbox/templates/celery_sandbox_request.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Your Request!</title>
</head>
<body>

<h2>You can make your request clicking on the button below!</h2>

<a href="/celery-sandbox/response">
<button type="button">Request!</button>
</a>

</body>
</html>


django_project/celery_sandbox/templates/celery_sandbox_response.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Your Response!</title>
</head>
<body>

<h2>Your Task has Started, and soon will be finished!</h2>
<p>Thanks for your request.</p>

</body>
</html>


django_project/celery_sandbox/views.py

from django.shortcuts import render
from celery_sandbox.tasks import very_expensive_computation


def my_request(request):

    return render(request, 'celery_sandbox_request.html')


def my_response(request):

    very_expensive_computation.delay()

    return render(request, 'celery_sandbox_response.html')


django_project/celery_sandbox/tasks.py

import time
from celery import shared_task

@shared_task
def very_expensive_computation():
    time.sleep(10)
    return 42


Restart celery, so it will register the new tasks



celery -A django_test worker -l info


The output:



So, what did we do here? Apart from the django stuff, we created two views, the first one is going to be our request view, where we will request a task through a button, it will look like this:



Once you click this button, a celery task will be invoked. Using our celery configuration, our app will look for the registered tasks on a tasks.py script, that should be inside a django app. The decorator @shared_task works as the basic one on the beginning of the tutorial. The difference is that @shared_task is more powerful, once it lets you to run a task even if you don't have a concrete app instance. Which means that we can use this task with other apps, and then it will be reusable in other scenarios.

Once we have a worker dedicated entirely to solve tasks, after clicking the button we get redirect instantaneously to the response page, that looks like:



This means that your django worker wasn't blocked, and you didn't have to wait for the expensive computation to run, you just delegated this task to the celery app, and it took care of it. Pretty cool right?


If you look at your Terminal, you can check that your worked received the task and then calculated it:


[2017-11-19 18:35:14,241: INFO/MainProcess] Received task: celery_sandbox.tasks.very_expensive_computation[f4d7d4ee-8f78-46f6-8138-0486c7fd4160]
[2017-11-19 18:35:24,255: INFO/ForkPoolWorker-2] Task celery_sandbox.tasks.very_expensive_computation[f4d7d4ee-8f78-46f6-8138-0486c7fd4160] succeeded in 10.011330634000842s: 42

Extra: using Flower to check-up your tasks

Some day you will end-up having a lot of tasks on your project. And you will be interested on monitoring it and getting to know it statuses. Flower is a python library that has a pretty nice and useful dashboard to monitor your tasks, know it status, check it results and much more. Let's just install it quickly and take a look at it:



pip install flower


Running flower is pretty easy:



celery flower -A django_project --address=127.0.0.1 --port=5555


And boom! That's it. Pretty simple. Now you can go to http//127.0.0.1:5555 and start monitoring your tasks:



Final Thoughts

Thanks for reaching to the end of this post. Remember that Celery is a very useful tool, but you must use it wisely, having a lot of workers and running a big amount of tasks on a queue might be messy on production in case you don't have a server that can take care of the load.

But by the way, before you leave let me know your thoughts about this post. Anything you would like to add? Is it good already?! See you later!