This website is made possible by displaying online advertisements to our visitors.
Please consider supporting us by disabling your ad blocker.

Asynchronous Tasks in Django with Celery and RabbitMQ

Aug. 20 2021 Yacine Rouizi
Celery Django
Asynchronous Tasks in Django with Celery and RabbitMQ

Letting your main web server handling time-consuming tasks is a bad idea. Imagine a user sending a request to the server that takes several seconds to be processed. So during all this time, the user is waiting for a response from the server without being able to do something.

This problem can be encountered, for example, in the following cases:

  • Sending emails
  • Image processing
  • Export data
  • Generating reports
  • ...

To work around this problem, we can use another server that performs these operations in the background while the main server responds to client requests.

This is where Celery comes in ...

How does Celery Work?

Celery is a powerful asynchronous task queue based on distributed message passing that allows us to run time-consuming tasks in the background.

Celery uses a message broker to communicate with workers. So, basically, Celery initiates a new task by adding a message to the queue. A Celery worker then retrieves this task to start processing it.

Celery workers are simply processes that are constantly monitoring task queues for new work to perform.

Install RabbitMQ as a Message Broker

As we said above, Celery communicates with the workers through a message queue using a broker. In this tutorial, we will use RabbitMQ, but Celery supports other message brokers like Redis and Amazon SQS.

Here I will show you how to install RabbitMQ on Ubuntu, but if you have another operating system, please check the Downloading and Installing RabbitMQ page to see how to install it.

First, let's install RabbitMQ on Ubuntu by running the following command:

$ sudo apt-get install rabbitmq-server

Now that we have RabbitMQ installed, we need to enable it and start the service:

$ sudo systemctl enable rabbitmq-server
$ sudo systemctl start rabbitmq-server

Then check to see if the service is working using the command below:

$ sudo systemctl status rabbitmq-server

You should see something like this:

● rabbitmq-server.service - RabbitMQ Messaging Server
     Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
     Active: active (running) since Fri 2021-08-06 20:41:38 CEST; 1min 0s ago
   Main PID: 39385 (beam.smp)
     Status: "Initialized"
      Tasks: 235 (limit: 18888)
     Memory: 95.6M
     CGroup: /system.slice/rabbitmq-server.service
             ├─39381 /bin/sh /usr/sbin/rabbitmq-server
             ├─39385 /usr/lib/erlang/erts-11.0.3/bin/beam.smp -W w -K true -A 192 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 104857>
             ├─39637 erl_child_setup 65536
             ├─39727 inet_gethost 4
             └─39728 inet_gethost 4

août 06 20:41:36 rouizi systemd[1]: Starting RabbitMQ Messaging Server...
août 06 20:41:38 rouizi systemd[1]: Started RabbitMQ Messaging Server.

Project Setup

Before we start, grab a copy of the project from my GitHub repository and then check out the v0 tag:

$ git clone https://github.com/python-dontrepeatyourself/django-celery-rabbitmq
$ cd django-celery-rabbitmq
$ git checkout v0

$ python3 -m venv venv
$ source venv/bin/activate
(venv) $ pip install -r requirements.txt
(venv) $ python manage.py runserver

install Celery:

(venv) $ pip install Celery

Integrating Celery with Django

In order to use Celery with the Django application, we need to define a Celery instance. To do that, create a new file called celery.py where settings.py is located and add the following code:

# mysite/celery.py
import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

app = Celery('mysite')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

So basically, we create an app instance and use the Django settings module as a configuration source for Celery. This allows us to configure Celery from the Django settings module.

Using namespace='CELERY' tells Celery to only read configurations that are uppercase and start with the `CELERY_` prefix. So, for example, we need to set the broker_url setting to CELERY_BROKER_URL.

Tasks specific to each app of our Django application should live in a tasks.py module, and by adding app.autodiscover_tasks() Celery will automatically discover those tasks.

To ensure the app is loaded when Django starts, we need to import the Celery app we defined above in mysite/__init__.py:

# mysite/__init__.py
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

Celery uses by default RabbitMQ as the message broker. If you prefer to specify it explicitly, put the following in your settings.py file:

CELERY_BROKER_URL = 'amqp://localhost'

In case where you want to use Redis instead of RabbitMQ as the message broker, you need to change the CELERY_BROKER_URL configuration to:

CELERY_BROKER_URL = 'redis://localhost:6379/'

Celery will find where the broker is running by using this URL.

Now create a new file named tasks.py inside the core app and add the following task:

# core/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

This is a super simple task that adds two numbers together.

We are going to use it just to see that Celery is working properly and receiving the task.

Now we need to start the worker in order to process the task asynchronously. So first open a new terminal window, navigate to the project directory, activate the virtual environment, and let's see how to do it.

To get the list of command-line options available, you can use the --help flag:

(venv) $ celery --help
Usage: celery [OPTIONS] COMMAND [ARGS]...

  Celery command entrypoint.

Options:
  -A, --app APPLICATION
  -b, --broker TEXT
  # ...

Commands:
  amqp     AMQP Administration Shell.
  beat     Start the beat periodic task scheduler.
  # ...
  worker   Start worker instance.

So to run the worker process we can use the celery worker command with the -A flag to specify the Celery application:

(venv) $ celery -A mysite worker -l info
 
 -------------- celery@rouizi v5.1.2 (sun-harmonics)
--- ***** ----- 
-- ******* ---- Linux-5.8.0-63-generic-x86_64-with-glibc2.32 2021-08-04 05:12:58
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         mysite:0x7fc6c97d5bb0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . core.tasks.add
# ...

-l info makes the logging more verbose. You can make it even more verbose using the debug option. 

To run the task asynchronously we can use the delay() method:

(venv) $ python manage.py shell
>>> from core.tasks import add
>>> add.delay(5, 5)
<AsyncResult: 5fe054d6-58cd-488a-9781-89604f691b8b>

The delay() method is a shortcut to apply_async() method, which can be used as follows:

>>> add.apply_async((5, 5))
<AsyncResult: ec29c201-8d1f-4643-974b-09cfdd41862d>

The apply_async() gives us more control over how to execute the task. For example, we can use the countdown option to run the task at some point in the future:

>>> add.apply_async((5, 5), countdown=60)

In the above example, the task will be executed at least after one minute.

You can see from above that delay() and apply_async() methods return an AsyncResult instance. When a result backend is enabled we can use the AsyncResult instance to keep track of the task execution state.

If you return to the terminal where the worker process is running you can see that the task was received and executed:

[2021-08-04 12:19:38,429: INFO/MainProcess] Task core.tasks.add[261ab078-afc4-46a6-b672-b19c55ab7de4] received
[2021-08-04 12:19:38,430: INFO/ForkPoolWorker-8] Task core.tasks.add[261ab078-afc4-46a6-b672-b19c55ab7de4] succeeded in 0.0001843449999796576s: 10

Sending Asynchronous Emails with Celery

Hopefully, now you have a better understanding of how Celery works and how to implement it in a Django application. So in this section, we are going to see how to send emails asynchronously leaving the Django application free to respond to requests.

So we will create a simple form where a user can enter his email and when the form is submitted to the server, the view will process the form and execute the task. A new task message will be added to the message queue. The Celery worker then picks up the task from the message queue and sends an email to that address.

First, create the file forms.py inside the core application and add this form:

# core/forms.py
from django import forms

class SendEmailForm(forms.Form):
    email = forms.EmailField()

Now, create the templates directory inside the core app and create the index.html file inside the templates directory.

Here is the HTML that goes inside the index.html file:

<!-- core/templates/index.html -->
<!doctype html>
<html lang="en">
  <head>
    <!-- Required meta tags -->
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">

    <!-- Bootstrap CSS -->
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-EVSTQN3/azprG1Anm3QDgpJLIm9Nao0Yz1ztcQTwFspd3yD65VohhpuuCOmLASjC" crossorigin="anonymous">
    <title>Django Celery</title>
  </head>
  <body>
    <div class="container mt-5">
      <form method='post'>
        {% csrf_token %}
        <p>Send email to: <input type="text" name="email" class="form-control" id="id_email" placeholder="Email"></p>
        <button type="submit" class="btn btn-primary">Send</button>
      </form>
    </div>
  </body>
</html>

Now we need to configure the email server and I am going to use Gmail in this case.

Open settings.py and add this configuration at the end of the file:

# mysite/settings.py
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.gmail.com'
EMAIL_PORT = 587
EMAIL_HOST_USER = 'alfabravo318@gmail.com' # put your gmail address here
EMAIL_HOST_PASSWORD = '<your-gmail-password>'
EMAIL_USE_TLS = True

If you don't want to send real emails using the SMTP backend, you can use the console backend to send emails to the standard output. In this case, you need to replace the previous code with this code:

EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend'

 views.py

# core/views.py
from django.shortcuts import render

from .forms import SendEmailForm
from .tasks import send_email_task # we will define this function later


def index(request):
    if request.method == 'POST':
        form = SendEmailForm(request.POST)
        if form.is_valid():            
            email = form.cleaned_data['email']
            send_email_task.delay(email)
            return render(request, 'index.html', {'form': form})
            

    form = SendEmailForm()
    return render(request, 'index.html', {'form': form})

This is a standard view that accepts POST requests. The interesting part here is the sending of the email, which is the task that will be sent asynchronously by Celery. If we don't use the .delay() function the task will be executed in the current process.

urls.py

# mysite/urls.py
from django.contrib import admin
from django.urls import path

from core.views import index # add this

urlpatterns = [
    path('admin/', admin.site.urls),
    path('send-email/', index, name='send_email') # add this
]

tasks.py

# core/tasks.py
import time
from celery import shared_task

from django.core.mail import send_mail

# ...

@shared_task
def send_email_task(email):
    "background task to send an email asynchronously"
    subject = 'Helo from Celery'
    message = 'This is a test email sent asynchronously with Celery.'
    
    time.sleep(5)
    return send_mail(
        subject,
        message,
        'alfabravo318@gmail.com',
        [email],
        fail_silently=False
    )

We are using Django's send_mail function to send an email. The task sleeps for 5 seconds before sending the email just to simulate a long-running task.

In order to allow Django to access your Gmail account, you'll need to turn on less secure app on your account.

After that, fill in the form and hit send. You will see that the Django server is free to respond to new requests while the Celery worker is processing the task.

Here is a screenshot that shows how the Celery worker processed the task:

Celery worker processing the task

In this example, I used the console backend to make things easy and see how the worker executes the task and sends the email in real-time.

Summary

In this tutorial, we saw why and how to use Celery to run asynchronous tasks in a Django app. In a future post, we will see how to use a result backend to check the status of a Celery task.

If you have any questions or want to say something please leave a comment in the section below. Also, if you like the content on this blog, be sure to share and subscribe to the mailing list to be notified of future posts.

The code used in this tutorial is available on GitHub: https://github.com/python-dontrepeatyourself/django-celery-rabbitmq

Support DontRepeatYourSelf

If you appreciate what I am doing here, or if it helped you solve your issues please consider buying me a coffee (or 2) as a token of appreciation. It will mean a lot to me and it will really make a difference.

Thank you for your support.

Buy Me a Coffee at ko-fi.com

Previous Article
How to Use Django's Generic Foreign Key

How to Use Django's Generic Foreign Key

Next Article
Django Celery Result Backend

Django Celery Result Backend

Join the mailing list to be notified about new posts and updates.

Leave a comment

(Your email address will not be published)