Зачем приложению на Django нужен Celery
Если при программировании на Python вы работаете с Django, то знаете, что этот фреймворк позволяет реализовать полную логику приложения без использования сторонних компонентов. Однако с некоторыми ситуациями инструмент справляется плохо. Так, синхронная обработка событий в режиме многозадачности может сильно затормозить или полностью остановить работу Python-приложения.
Решить проблему помогают инструменты для управления потоком событий в асинхронном режиме, например Celery для Django. Система получает сообщения из очереди и выполняет задания по расписанию или заданному алгоритму. В качестве брокера очередей можно выбрать RabbitMQ, Amazon SQS и другие сервисы.
Ключевых функций у Celery две:
- асинхронная обработка рабочих процессов;
- определение момента старта обработки задачи с помощью планировщика.
Система снимает с Python-приложения нагрузку по обработке ресурсоемких операций, которые могут работать независимо от основного потока. Кроме этого, инструмент позволяет назначать определенное время для обработки событий, в том числе выполнять повторяющиеся задания. Такие возможности используются при отправке email или push-уведомлений, сборе отзывов пользователей, аналитике, обработке сложных запросов и фоновых событий.
Как настроить Celery
Инструмент несовместим с Windows, можно работать на Linux или через терминал Ubuntu.
Алгоритм настройки:
- выбор брокера;
- установка пакетов через pip install;
- создание экземпляра приложения и настройка его параметров;
- определение заданий для асинхронной обработки.
Пятый, необязательный шаг — настройка серверной базы данных в случае, если требуется отслеживание результатов выполнения заданий. На механизм обработки событий наличие или отсутствие БД не влияет. Сервер нужен только для учета и регистрации данных об успешной или неуспешной обработке.
Выбор брокера сообщений
На первом шаге нужно выбрать хранилище, из которого Celery должна брать задания. Без брокера сообщений система будет выдавать ошибку из-за постоянных попыток подключения к источнику инструкций.
Лучше выбирать между RabbitMQ и Redis. RabbitMQ считается более надежным брокером сообщений, подходящим для большинства приложений на Django. Он эффективно обрабатывает большие сообщения, но если они приходят быстро и в большом количестве, то это может вызвать проблемы с масштабированием. Redis аналогичен по функциональности. Он хорошо работает для быстрой передачи небольших сообщений, большие могут перегрузить систему. Поэтому у этого брокера риск потери данных выше при техническом сбое и некорректном завершении процессов.
Для установки RabbitMQ на Ubuntu выполните команду:
sudo apt-get install rabbitmq-server
Для запуска на Docker выполните:
docker run -d -p 5672:5672 rabbitmq
Для установки Redis на Ubuntu выполните команду:
sudo apt install redis-server
Для запуска на Docker используйте команду:
docker run -d -p 6379:6379 redis
Установка пакетов
Для интеграции системы в приложение на Django нужно установить пакеты в вашу виртуальную среду. Это можно сделать через стандартные инструменты Python.
Команда для установки пакетов через pip:
pip install celery
В архитектуре появится брокер сообщений и система для их асинхронной обработки. Осталось подключить и настроить само приложение, из которого должны поступать задания.
Создание экземпляра приложения
В папке celery_django приложения управления создайте файл celery.py, откройте его в редакторе и добавьте следующий фрагмент:
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "celery_django.settings")
app = Celery("celery_django")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
В файл __init__.py нужно добавить:
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
В файл settings.py нужно добавить следующие строки (на примере использования Redis для брокера сообщений и сервера БД для хранения результатов):
CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0"
Параметры в первой строке settings.py указывают приложению, откуда система распределения процессов будет брать задания, то есть по какому адресу нужно отправлять сообщения. Вторая строка содержит адрес для регистрации результатов. В нашем примере адреса совпадают, так как используется одна и та же база Redis. Если в роли хранилища сообщений выступает RabbitMQ, адреса будут разными. В качестве тестового URL для RabbitMQ можно использовать amqp://myuser:mypassword@localhost:5672/myvhost.
После запуска продукта в рабочей среде нужно заменить URL в settings.py на реальные адреса обеих служб. Далее для брокера сообщений мы будем использовать «your_broker_url».
Определение задач
Создайте файл tasks.py:
from celery import Celery
app = Celery('tasks', broker='your_broker_url')
@app.task
def add(x, y):
return x + y.
@app.task определяет настройки фоновых задач. В нашем примере функция складывает значения X и Y.
Другие возможности:
- ignore_result = True — игнорирование результата выполнения задания;
- rate_limit — ограничение скорости обработки заданий;
- retry — количество попыток с паузами на указанный временной интервал.
Для асинхронного вызова и управления потоком задач предусмотрены следующие методы:
- delay() — стандартный метод;
- apply_async() — задание времени, приоритета и других параметров обработки событий;
- group() — параллельная обработка нескольких заданий;
- signature() — формирование подписи события;
- chain() — объединение несколько заданий в связанную последовательность с передачей каждому следующему заданию результата предыдущего задания в качестве аргумента;
- chord() — комбинация chain() и group() для обработки набора заданий с последующим вызовом задания с результатами этого набора.
Самый простой метод вызова — delay(), он возвращает экземпляр AsyncResult, который можно использовать для контроля статуса задания, ожидания завершения процесса, получения результата или данных о неудачной попытке обработки запроса. Функция учета результатов по умолчанию не активируется. Если логика ПО требует отслеживание состояния и результатов, нужно настроить сервер Redis, MongoDB, Memcached, PostgreSQL или любой другой на ваш выбор.
Примеры применения Celery
Система позволяет перенести процессы, требующие вычислительных мощностей, из приложения в распределенную очередь.
Отправка сообщений
Автоматическое подтверждение действий по email, письмо со ссылкой для регистрации, код для сброса пароля — всю отправку сервисных сообщений удобно делегировать Celery. Это помогает сохранить высокую скорость работы приложения и без задержек доставлять нужные сообщения множеству одновременно подключенных пользователей.
Кроме электронных писем, можно рассылать другие виды сообщений, например push-уведомления и СМС.
Задачи по расписанию
Если нужно выполнять операцию в указанное время или регулярно, можно использовать параметр eta или celerybeat schedule. Метод подходит для публикаций постов по расписанию, отправки напоминаний пользователям в конкретный день, контроля дат оплаты или окончания бесплатного пробного периода.
Генерация отчетов
Выполнение длительных заданий, например составление крупного отчета или сложных расчетов, лучше перевести в асинхронный режим, чтобы не парализовать работу всего сервиса. Операции по извлечению данных или их обработке, анализу или формированию документа, его сохранению или отправке на указанный адрес лучше проводить за пределами приложения на Django.
Автоматическая модерация текста
В веб-сервисах с возможностью добавления пользовательского контента нужны алгоритмы проверки текста. С помощью асинхронной обработки задач можно переводить тексты на другие языки, искать запрещенные слова для бана, анализировать содержание сообщений пользователей перед публикацией.
Обработка очередей
Один пользователь может создавать несколько запросов, которые нужно обрабатывать по порядку. Приведем пример:
- пользователь делает запрос на генерацию отчета;
- Django-приложение отправляет этот запрос на обработку в асинхронном режиме;
- во время обработки веб-сервис сохраняет полную функциональность, поэтому пользователь делает еще один запрос — например, генерирует другой отчет;
- запрос на второй отчет ставится в очередь, что также не влияет на работоспособность приложения.
Реализовать такой алгоритм можно с помощью методов chain() и apply_async().
Таким образом, система асинхронной обработки заданий позволяет оптимизировать работу приложения и корректно распределить потоки событий. Использование этого инструмента помогает Python-разработчикам выпускать устойчивые масштабируемые продукты с хорошим запасом производительности на случай прироста базы пользователей или расширения функциональности ПО.
Для ускорения работы с исходным кодом на Python удобно использовать AI, например GIGA CODE. Это нейросетевой ассистент программиста, встроенный в платформу разработки и управления проектами GitVerse. Кроме работы с кодом, на GitVerse можно хранить исходники, управлять версиями продукта, вести совместную разработку. Для доступа к платформе зарегистрируйтесь на GitVerse.