Python. Celery

Введение

Celery - ПО для работы с, так называемыми, задачами. Идея в том, что мы создаем объект Celery и декорируем им функции, которые планируем обрабатывать с помощью api Celery, это и есть наши задачи. Обычно все сводится к фоновой работе каких-то, долго исполняемых, задач. Наш синхронно работающий python и его синхронно работающий Django (или прочие фреймворки) исполняет задачи сверху вниз, поэтому если в очередь на исполнение встала задача, которая будет выполняться долго, все приложение перестанет отвечать пока эта задача не выполнится. Вы возможно замечали на каких-нибудь сайтах, как он после какого-то действия становится не отзывчив к нажатиям, а через время это проходит, так вот скорее всего вызвано это тем, что в это время выполняется какая-то не мгновенная задача. Celery призван решать эту проблему, посредством выполнения долго исполняемых задач в фоновом режиме.

Как я упомянул выше, объект Celery, которым мы декорируем какую-то функцию, превращает эту функцию в объект, который мы будем называть task.
Для того чтобы обрабатывать эти task'и, существуют другие объекты Celery, которые называются worker.
И последней компонент успешно работающего Celery - broker. broker выполняет роль распределителя задач между worker'ами. Для успешной работы broker должен быть активен, то есть broker поднимается, скажем как web-сервер, и пока он поднят он распределяет task'и между worker'ами и следит, чтобы вся эта схема работала. Результаты выполненных задач должны где-то хранится, и эту задачу на себя также берет broker (в celery мы будем называть это backend). Дело в том, что для хранения данных нужна какая-то база, но не такая база, которые мы привыкли видеть, а база типа NoSQL. И backend task'ов как раз на таких базах данных и построен. В качестве broker'ов выделяют redis и rabbitMQ, есть еще некоторые менее популярные, но будет достаточно знакомства с двумя. И redis и rabbitMQ могу выступать как и в качестве broker'а, так и в качестве backend'а, но rabbitMQ в качестве бэкенда не совсем самостоятелен, чаще используют связку rabbitMQ - broker, redis - backend. Как пишут сами разработчики, "redis более подвержен потере данных в случае внезапного завершения работы или сбоев питания", возможно именно по этому rabbitMQ предпочитают использовать в качестве broker'а чаще чем redis, ну и плюс потому что rabbitMQ поддерживает AMQP (Расширенный протокол очереди сообщений), обсудим этот момент подробнее позже. Но познакомимся мы в этом материале и с redis и с rabbitMQ.

Надеюсь картинка в вашей голове, пусть и размытая, сформирована и мы можем написать первый самый простой пример, который наглядно продемонстрирует все, что написано выше.

официальная документация celery

Демо. Task

Для материалов по celery я завел новую директорию, создал виртуальное окружение и добавил gitignore. Поэтому к моменту как вы это читаете все разобранные примеры должны быть на моем github в соответствующем репозитории.
Я рекомендую вам взять за правило использовать этот минимальный набор подготовительных работ. Польза такой структурированности ваших проектов по изолированным виртуальным окружениям и хранении их на github будет становиться все ощутимей по мере роста количества этих самых проектов.
Скорее всего вы все равно придете к этому, с моего совета, с чьего-то еще, самостоятельно, но придете. Так что, если вы еще так не делаете, советую прислушаться.

Для работы с celery его надо установить.

И заодно установим сразу и redis и rabbitMQ.

pip install celery
pip install rabbitmq
pip install redis
sudo apt-get install redis-server

Теперь создадим пустой файл и напишем там следующее

code celery_first_example.py
from celery import Celery
import datetime

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost')


@app.task
def now_time():
    return datetime.datetime.now()

app содержит объект Celery, состоящий в нашем случае из имени - 'tasks', брокера и бэкенда. На данный момент будем использовать redis для обеих сущностей.
Брокер должен содержать адрес, для redis можно использовать такой - 'redis://localhost:6379/0'. 6379 - стандартный redis port. 0 - NoSQL база данных, в redis они называются цифрами, от 0 и далее. redis://localhost - означает, что мы используем redis и крутиться он будет на локальном хосте - 127.0.0.1.
Бэкенд также содержит адрес, достаточно указать, имя и на каком устройстве он будет крутиться.

Теперь этим объектом мы можем декорировать произвольное число функций и к ним будут применимы методы Celery. Соответственно объектов Celery, при необходимости, мы тоже можем создать несколько, с разными брокерами и бэкендами.

Ну и сама функция просто возвращает текущие дату и время.

Попробуем перейти в консоль и поставить задачу в очередь на выполнение

Python Console
>>> from simple_examples.celery_first_example import now_time
>>> now_time.delay()
<AsyncResult: 497d3763-3842-4f4b-9aa8-f4b229bd331e>
>>> result = now_time.delay()
>>> result.get()
>>> result_2 = now_time.delay()
>>> result_2.get()

Для отправки задачи в очередь существует метод .delay(), и если функция содержит аргументы то передаются они также в этот метод.
Возвращает .delay() объект AsyncResult.
К этому и объекту уже применимы методы для работы с задачей и этот объект мы и будем обрабатывать, для этого сохраним его в переменную.
Метод .get() используется для получения результата задачи и в данному случае мы должны были после применения этого метода увидеть дату и время, но не увидели. Создадим еще одну переменную, содержащую AsyncResult и применим .get() к ней, результата мы также не видим, но и никакими ошибками вызовы не завершаются.

В PyCharm - Python Console есть такой интерфейс как Show Command Queue

Выглядит он так. Мы видим, что первый вызов .get() встал в очередь и все последующие команды также встают в эту очередь. И случилось это потому что некому брать эти задачи в работу, нет тех самых воркеров, о которых я упоминал во вступлении. Давайте запустим воркеров

Terminal
$ celery --app=simple_examples.celery_first_example:app worker
 -------------- celery@tsarkovilya v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.15.0-47-generic-x86_64-with-glibc2.35 2022-09-16 19:58:33
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f612f2e1c60
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

Команда для запуска начинается с флага --app через знак равно указывается путь до файла, где лежит объект Celery, (app в нашем случае) через двоеточие имя объекта Celery и команда worker, который запустит воркеров.

После запуска команды вы уведите подобную системную информацию.
Самая верхняя строка - имя воркера, сформировано оно автоматически
Далее информация об устройстве
После config, включает он:
app - объект Celery
transport - broker
results - backend
concurrency - количество выделенных prefork процессов, которые и будут обрабатывать наши задачи
task events - логирование действий воркеров, нужно по большей степени при использовании мониторингов, пока мы об этом не говорили и данную функцию поэтому пока трогать не будем
queues - информация о запущенных очередях, по умолчанию используется одна очередь - celery.

Воркеры запущены, теперь можно вернуться в Python Console и посмотреть произошли-ли какие-нибудь изменения

Python Console
...
>>> result_2.get()
'2022-09-16T19:58:34.708551'
'2022-09-16T19:58:34.734809'

Произошли. Все задачи, находящиеся в очереди, выполнились сразу, как только были запущены воркеры.

Вот на таком не хитром примере мы разобрались со скелетом Celery, далее будем говорить об отдельных возможностях этого инструмента подробнее.

Конфигурация

Разобравшись с логикой и идеей задач, становится понятно, что при внедрении Celery в ваши проекты, вы всегда будете использовать для работы некоторые настройки. Конечно, первым делом - настройки для брокера и для бэкенда, плюс присутствует еще ряд настроек, которые мы можем изменять. Как и в случае любой другой утилиты, которая так или иначе требует нашего участия в ее конфигурации, мы хотим вынести все эти настройки в отдельный файл, или как, например, в случае с Django в общий файл с настройками, где помимо самих настроек мы храним и наши конфигурационные константы.
И Celery разумеется не исключение, чаще вы будете видеть настройки брокера и прочего в отдельном файле, а не в качестве аргументов при объявлении объекта Celery.

Предлагает Celery для этого несколько вариантов, но самый предпочтительный это, конечно, вынесение всех настроек в отдельный файл.
У меня файл celery_first_example.py находится в папке simple_examples, создам в этой же папке файл celeryconfig.py со следующим содержанием

code celeryconfig.py
REDIS_HOST = 'redis://localhost'
REDIS_PORT = '6379'
REDIS_BD = '0'

broker_url = REDIS_HOST + ':' + REDIS_PORT + '/' + REDIS_BD
result_backend = REDIS_HOST

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Moscow'
enable_utc = True

По большей части взяты настройки из документации, от себя я добавил объявление констант для редиса и формирование из них адреса брокера и бэкенда, именно в таком формате вы скорее всего будете сталкиваться с конфигурацией Celery.

Построчно тут особо не на чем заострять внимания, настройка брокера, бэкенда, обработка данных в формате json, времени и прочего. Со всеми этими настройками можно ознакомиться тут, там все подробно описано, поэтому не станем отдельно подробно на этом останавливаться.

Файл конфигурации пока не связана с объектом Celery. Вам могло показаться, что Celery сам будет искать файл с названием celeryconfig.py и брать от туда настройки, я тоже поначалу так думал, но на самом деле это условие совсем необязательное, можно использовать любое название для этого файла.

Подключим файл следующим образом.

code celery_first_example.py
from celery import Celery
import datetime

app = Celery('tasks')
app.config_from_object('simple_examples.celeryconfig')


@app.task(name='what-time-is-it')
def now_time():
    return datetime.datetime.now()


@app.task(name='what-amount')
def amount(x, y):
    return x + y

Уберем из аргументов app настройки написанные ранее, оставим только имя, а ниже воспользуемся методом .config_from_object(), в качестве аргумента метод принимает путь до файла с конфигурацией.

Плюс я добавил еще одну задачу, просто для демонстрации, как объявить несколько задач, как видно, все максимально очевидно.

И добавил аргумента name для каждой задачи, задачи имеют рад настраиваемых аргументов, позже посмотрим на некоторые. Имя каждой задачи должно быть уникальным и если не задавать его вручную оно сформируется автоматически.

Теперь можно запустить воркеров и убедиться, что файл конфигурации успешно привязан к объекту Celery, если это не так вы увидите сообщение об ошибке.

Если ошибок вы не увидели, но все равно сомневаетесь, что ваша конфигурация применена, попробуйте изменить какую-нибудь настройку, например допустить ошибку в настройке для брокера и посмотреть на изменения.

Запускать воркеров можно с параметрами, все их можно увидеть по команде celery worker --help, например, воспользуемся флагом --loglevel, сокращенно -l и укажем явно, что нас интересует логирование уровня INFO. Также добавим флаг -E, который активирует --task-events, но как упоминалось выше без мониторинга для нас этот флаг ничего не изменит.

Terminal
$ celery --app=simple_examples.celery_first_example:app worker -E --loglevel INFO
 -------------- celery@tsarkovilya v5.2.7 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-5.15.0-47-generic-x86_64-with-glibc2.35 2022-09-18 16:17:53
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f9043a11f90
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: ON
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . what-amount
  . what-time-is-it

[2022-09-18 16:17:53,850: INFO/MainProcess] Connected to redis://localhost:6379/0
[2022-09-18 16:17:53,854: INFO/MainProcess] mingle: searching for neighbors
[2022-09-18 16:17:54,862: INFO/MainProcess] mingle: all alone
[2022-09-18 16:17:54,870: INFO/MainProcess] celery@tsarkovilya ready.

Как видно с флагом для логирования мы сразу видим дополнительную информацию. Давайте запустим несколько задач и посмотрим какую информацию мы будем при этом логировать.

Python Console
>>> from simple_examples.celery_first_example import *
>>> res = now_time.delay()
>>> other_res = amount.delay(10, 11)
>>> res.get()
'2022-09-18T16:21:38.329214'
>>> other_res.get()
21
Terminal
...
[2022-09-18 16:17:54,870: INFO/MainProcess] celery@tsarkovilya ready.
[2022-09-18 16:21:38,327: INFO/MainProcess] Task what-time-is-it[c5bca45f-9e10-4b8a-8ce2-7a897e935046] received
[2022-09-18 16:21:38,334: INFO/ForkPoolWorker-8] Task what-time-is-it[c5bca45f-9e10-4b8a-8ce2-7a897e935046] succeeded in 0.00525428900073166s: datetime.datetime(2022, 9, 18, 16, 21, 38, 329214)
[2022-09-18 16:21:42,839: INFO/MainProcess] Task what-amount[c0cf428b-a4ab-41b3-8d0c-a250654fa99b] received
[2022-09-18 16:21:42,842: INFO/ForkPoolWorker-8] Task what-amount[c0cf428b-a4ab-41b3-8d0c-a250654fa99b] succeeded in 0.0007629800002177944s: 21

Мы получаем информацию о том какая задача была выполнена, во сколько, за сколько и каков результат ее исполнения. И можно также увидеть, что результат записывается в базу сразу после выполнения задачи, а не при использовании метода .get().

Все прекрасно работает, а значит и наши настройки конфигурации успешно применены.

Periodic Tasks

Иногда может оказаться необходимым исполнять задачи в определенное время, раз в час, раз в месяц, первый день каждого квартала и так далее. Celery позволяет достаточно просто это реализовать. Для примера давайте напишем программу немного поинтереснее, чем писали выше, допустим такую

code tasks.py
import celery
import random
import string
import datetime
import psycopg2
# Модуль для работы с исполнением задач по расписанию
from celery.schedules import crontab

# Конфигурация БД
connection = psycopg2.connect(
    host='127.0.0.1',
    database='celery_example_db',
    user='(имя пользователя)',
    password='(пароль от PgAdmin)',
    # Классический PostgreSQL порт
    port=5432,
)
# Автоматическое применение изменений
connection.autocommit = True

# Объект Celery с импортированными настройками конфигурации
app = celery.Celery('db_work')
app.config_from_object('simple_examples.celeryconfig')

# Конфигурация отправки по времени
app.conf.beat_schedule = {
    'add_note_to_db': {
        # Выполнение задачи раз в минуту
        'task': 'insert-into-table-task',
        'schedule': crontab(minute='*/1'),
    },
}


@app.task(name='create-table-task')
def create_table():
    """Задача на создание таблицы в БД"""

    # Вывод информации о версии postgreSQL и ПК
    with connection.cursor() as cursor:
        # Объект курсора устанавливает соединение с БД
        # Метод .execute() для SQL-запроса
        cursor.execute(
            "SELECT version();"
        )

        # Метод .fetchone() возвращает результат работы .execute()
        print(f'Версия - {cursor.fetchone()}')

    # Создание новой таблицы с полями id и log_info
    with connection.cursor() as cursor:
        cursor.execute(
            """CREATE TABLE results(
                id serial PRIMARY KEY,
                log_info text NOT NULL);"""
        )

        print('Таблица results создана')


@app.task(name='insert-into-table-task')
def insert_into_table():
    """Задача для добавления новой записи в таблицу"""

    # Генерация случайно строки из цифр и букв
    letters_and_digits = string.ascii_letters + string.digits
    some_string = ''.join(random.choice(letters_and_digits) for i in range(random.randint(30, 50)))

    # Добавление сгенерированной строки в таблицу results
    with connection.cursor() as cursor:
        cursor.execute(
            "INSERT INTO results (log_info) VALUES(%s);", [some_string])

        print(f'Запись {some_string} добавлена в таблицу results')

    # Генерация результата, возвращаемого задачей
    res = f'{some_string} добавлена в {datetime.datetime.now()}'
    return res

Создадим БД в postgreSQL и с помощью библиотеки psycopg2 подключимся к ней.

документация psycopg2.

Для установки psycopg2 используйте команду

pip install psycopg2-binary

Подключаемся к БД через .connect(), куда указываем все данные от нашей БД и включаем .autocommit = True. Таким образом все изменения над базой будут применяться без дополнительного подтверждения.

Далее создаем объект Celery, конфигурация точно такая же, как использовалась в предыдущем разделе.

Конфигурацию отправки по времени рассмотрим в последнюю очередь.

Создадим две задачи, первая на создание таблицы, которая будет состоять всего из двух полей, вторая - для добавления случайно сгенерированной последовательности символов в эту таблицу. Методом .cursor() мы активируем соединение с БД и внутри метода .execute() можем писать SQL-запросы классическим SQL-синтаксисом. Ну и поскольку соединение нужно закрывать для сохранения данных и изменений используем менеджер контекста.

Задача по созданию таблицы и выводе служебной информации о БД и ПК не будет ничего возвращать, а задача на добавление новой записи будет возвращать саму сгенерированную запись и время, когда это произошло.

Сначала поднимем воркеров и попробуем выполнить эти задачи через Python Console.

Terminal
$ celery --app=simple_examples.tasks:app worker -l INFO
 -------------- celery@tsarkovilya v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.15.0-48-generic-x86_64-with-glibc2.35 2022-09-25 12:54:44
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         db_work:0x7f7e70a9c160
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . create-table-task
  . insert-into-table-task

[2022-09-25 12:54:44,809: INFO/MainProcess] Connected to redis://localhost:6379/0
[2022-09-25 12:54:44,812: INFO/MainProcess] mingle: searching for neighbors
[2022-09-25 12:54:45,821: INFO/MainProcess] mingle: all alone
[2022-09-25 12:54:45,841: INFO/MainProcess] celery@tsarkovilya ready.
Python Console
>>> from simple_examples.tasks import *
>>> create_table.delay()
<AsyncResult: c43c5881-4e72-4a60-95ed-181533a3ffc2>
>>> insert_into_table.delay()
<AsyncResult: 52b32039-eb16-42b1-85c7-55791a4fee59>

Запустим воркеров с флагом -l INFO, чтобы просматривать информацию об исполняемых задачах. И запустим создание таблицы и добавление записи.

После этого вернемся в терминал и посмотрим информацию

Terminal
...
[2022-09-25 12:55:53,308: INFO/MainProcess] Task create-table-task[c43c5881-4e72-4a60-95ed-181533a3ffc2] received
[2022-09-25 12:55:53,310: WARNING/ForkPoolWorker-7] Версия - ('PostgreSQL 12.11 (Ubuntu 12.11-0ubuntu0.20.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0, 64-bit',)
[2022-09-25 12:55:53,323: WARNING/ForkPoolWorker-7] Таблица results создана
[2022-09-25 12:55:53,328: INFO/ForkPoolWorker-7] Task create-table-task[c43c5881-4e72-4a60-95ed-181533a3ffc2] succeeded in 0.018422689999852082s: None
[2022-09-25 12:56:01,091: INFO/MainProcess] Task insert-into-table-task[52b32039-eb16-42b1-85c7-55791a4fee59] received
[2022-09-25 12:56:01,103: WARNING/ForkPoolWorker-7] Запись kROEX1URp5jKc0b4SNAO3kTiPD7ev5wLKvRKjvsbxjp добавлена в таблицу results
[2022-09-25 12:56:01,105: INFO/ForkPoolWorker-7] Task insert-into-table-task[52b32039-eb16-42b1-85c7-55791a4fee59] succeeded in 0.012491166000017984s: 'kROEX1URp5jKc0b4SNAO3kTiPD7ev5wLKvRKjvsbxjp добавлена в 2022-09-25 12:56:01.103840'

Как видно все отлично работает. Если открыть pgAdmin мы увидим созданную таблицу result и добавленную в нее запись.

Теперь что касается периодических задач. У меня уже написана конфигурация дла выполнения второй задачи раз в минуту. И при текущем запуске я не комментировал эту конфигурацию, запустил все как есть, тем не менее, раз в минуту никакого выполнения задачи не происходит. Перед тем как запустит выполнение давайте взглянем на настройку.

Конфигурация периодичности записывается в методе .beat_schedule в виде словаря, каждая новая запись этого словаря это пара состоящая из имени, которе мы придумываем сами и параметров также в виде словаря. В качестве параметров в самом простом варианте выступает сам task и schedule(расписание) для task. В task мы передаем имя задачи из параметра name, а не имя функцию, которую мы декорируем.

Для настройки расписания используем celery crontab. Вообще cron это как раз и есть утилита для временного исполнения задач, это какая-то уникальная celery утилита, это утилита unix-систем. И используется она, конечно, не только в python. Подробно с примера использования crontab в celery для разных временных промежутков можно найти в этом разделе документации.

Запись crontab(minute='*/1') означает выполнять задачу каждую минуту. Минута это минимально возможное время периодичности, поэтому запись crontab(minute='*/1') можно заменить на crontab() и это будет означать то же самое.

Запускаются периодические задачи похожим образом с воркерами, используем тот же путь, только команда worker заменятся на beat.

Ну и открыть все это дело нужно в другом терминале, поскольку в первом уже запущены воркеры.

Terminal(2)
$ celery --app=simple_examples.tasks:app beat -l INFO
celery beat v5.2.7 (dawn-chorus) is starting.
__    -    ... __   -        _
LocalTime -> 2022-09-25 13:19:46
Configuration ->
    . broker -> redis://localhost:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2022-09-25 13:19:46,878: INFO/MainProcess] beat: Starting...
[2022-09-25 13:19:46,906: INFO/MainProcess] Scheduler: Sending due task add_note_to_db (insert-into-table-task)
[2022-09-25 13:20:00,002: INFO/MainProcess] Scheduler: Sending due task add_note_to_db (insert-into-table-task)
[2022-09-25 13:21:00,050: INFO/MainProcess] Scheduler: Sending due task add_note_to_db (insert-into-table-task)
Terminal
...
[2022-09-25 13:19:46,932: INFO/MainProcess] Task insert-into-table-task[b5894245-3ea2-4f50-9bd8-b5826d10ccf5] received
[2022-09-25 13:19:46,933: WARNING/ForkPoolWorker-7] Запись TgQYBCWlBSrduKnCVv7f4vClVTjSuK2Wa8aJLGdIq67L4fY8 добавлена в таблицу results
[2022-09-25 13:19:46,934: INFO/ForkPoolWorker-7] Task insert-into-table-task[b5894245-3ea2-4f50-9bd8-b5826d10ccf5] succeeded in 0.0015785430000505585s: 'TgQYBCWlBSrduKnCVv7f4vClVTjSuK2Wa8aJLGdIq67L4fY8 добавлена в 2022-09-25 13:19:46.934026'
[2022-09-25 13:20:00,005: INFO/MainProcess] Task insert-into-table-task[5ae22c7a-1067-4a06-902f-1512ef0fa4d1] received
[2022-09-25 13:20:00,016: WARNING/ForkPoolWorker-7] Запись bwnfTvC8u97MiS6FVGX1Aq6LeK7wechoJv48jT3KFLQC добавлена в таблицу results
[2022-09-25 13:20:00,017: INFO/ForkPoolWorker-7] Task insert-into-table-task[5ae22c7a-1067-4a06-902f-1512ef0fa4d1] succeeded in 0.010723345999849698s: 'bwnfTvC8u97MiS6FVGX1Aq6LeK7wechoJv48jT3KFLQC добавлена в 2022-09-25 13:20:00.016937'
[2022-09-25 13:21:00,053: INFO/MainProcess] Task insert-into-table-task[a079365b-2db1-4bc9-ad4c-1016f99a8cfc] received
[2022-09-25 13:21:00,063: WARNING/ForkPoolWorker-7] Запись DkvkKsSQqhQu5T5oMMRqtKptNJoWF7hB7w60 добавлена в таблицу results
[2022-09-25 13:21:00,065: INFO/ForkPoolWorker-7] Task insert-into-table-task[a079365b-2db1-4bc9-ad4c-1016f99a8cfc] succeeded in 0.01149353400023756s: 'DkvkKsSQqhQu5T5oMMRqtKptNJoWF7hB7w60 добавлена в 2022-09-25 13:21:00.064120'

При запуске beat мы видим некоторую служебную информацию и далее каждую минуту видим запись о том, что задача отправлена на исполнение. Обратите внимание первая задача запустилась сразу при запуске в 13:19:46, а вторая через 14 секунд ровно в 13:20:00 и последующие задачи запускаются уже в 00 каждой минуты.

В первом терминале мы видим информацию о выполнении каждой задачи, таким образом мы достаточно просто реализовали автоматическое ежеминутное добавление записи в БД.

Если вам вдруг понадобится выполнять задачи чаще чем раз в минут, то crontab вам не подойдет, вы можете использовать, например, .timedelta() и datetime, куда передадите количество секунд и в аком случае все тоже будет работать.

'schedule': datetime.timedelta(seconds=10),

flower

Каждый раз смотреть в терминал для получения информации о выполнении задач не очень удобно, особенно, когда задач не 1-2, а гораздо больше. У Celery есть инструмент для мониторинга - Flower.

Имеет отдельную документацию

Установка

pip install flower

Запуск командой

celery flower --port=5566

Но если запустить его просто так, то по адресу http://localhost:5566/ мы, конечно, увидим интерфейс flower, но он не будет ни к чему подключен.

Для запуска flower нужен еще один терминал и команда flower

Terminal
$ celery --app=simple_examples.tasks:app worker -E -l INFO
 -------------- celery@tsarkovilya v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.15.0-48-generic-x86_64-with-glibc2.35 2022-09-25 15:29:55
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         db_work:0x7f563cbb1e40
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: ON
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . create-table-task
  . insert-into-table-task

[2022-09-25 15:29:56,059: INFO/MainProcess] Connected to redis://localhost:6379/0
[2022-09-25 15:29:56,063: INFO/MainProcess] mingle: searching for neighbors
[2022-09-25 15:29:57,070: INFO/MainProcess] mingle: all alone
[2022-09-25 15:29:57,079: INFO/MainProcess] celery@tsarkovilya ready.
Terminal(2)
$ celery --app=simple_examples.tasks:app beat -l INFO
celery beat v5.2.7 (dawn-chorus) is starting.
__  -  ... __   -   _
LocalTime -> 2022-09-25 15:29:59
Configuration ->
    . broker -> redis://localhost:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2022-09-25 15:29:59,537: INFO/MainProcess] beat: Starting...
[2022-09-25 15:29:59,557: INFO/MainProcess] Scheduler: Sending due task add_note_to_db (insert-into-table-task)
Terminal(3)
$ celery --app=simple_examples.tasks:app flower --address=127.0.0.1 --port=5566
[I 220925 15:29:21 command:162] Visit me at http://127.0.0.1:5566
[I 220925 15:29:21 command:170] Broker: redis://localhost:6379/0
[I 220925 15:29:21 command:171] Registered tasks:
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap',
     'create-table-task',
     'insert-into-table-task']
[W 220925 15:29:21 command:177] Running without authentication
[I 220925 15:29:21 mixins:225] Connected to redis://localhost:6379/0

Таким образом запуск воркеров, периодические задачи и flower будет выглядеть следующим образом

По адресу http://127.0.0.1:5566 мы теперь видим интерфейс flower подключенный к redis.

Например, на вкладе с задачами мы теперь видим подробную информацию о каждой задаче. Более того, в каждую из них можно перейти и увидеть еще более расширенную информацию.

rabbitMQ как брокер

rabbitMQ, как и redis, - инструмент, который может работать самостоятельно, и использование его в связке с celery необязательное условие. По этой причине более детальные о rabbitMQ и redis мы поговорим в материалах, посвященным этим инструментам. Сейчас же мы просто посмотрим на минимальные необходимые настройки, которые нужны для использования rabbitMQ как брокера в связке с celery и посмотрим на минимальный набор команд для работы с rabbitMQ.

На этой странице официальной документации можно ознакомится с установкой под разные ОС.

Установим сервер rabbitMQ на машину с linux (kubuntu)

sudo apt-get install rabbitmq-server

Сервер rabbitMQ можно запускать и останавливать

Команды

systemctl start rabbitmq-server

systemctl enable rabbitmq-server

соответственно.

А для просмотра состояния сервера используйте

systemctl status rabbitmq-server

Если в ответ на status вы увидели состояние active, то все работает правильно.

Для управления сервером нужны пользователи.

Создается пользователь командой

sudo rabbitmqctl add_user myuser mypassword

myuser и mypassword вы придумываете сами.

Для просмотра списка пользователей используется команда

sudo rabbitmqctl list_users

А для удаления

sudo rabbitmqctl delete_user myuser

Соответственно myuser имя удаляемого пользователя из списка пользователей, в ответ на delete_user мы получим имя удаленного пользователя.

Далее, мы можем создать виртуальный хост и дать пользователю права на него.

Создается хост командой, имя хоста задаем сами

sudo rabbitmqctl add_vhost example_host

Список хостов можно увидеть по команде.

sudo rabbitmqctl list_vhosts

Интересующего нас пользователя мы можем тегировать как админа командой.

rabbitmqctl set_user_tags myuser administrator

И выдать этому пользователю права на это хостинг

sudo rabbitmqctl set_permissions -p example_host myuser ".*" ".*" ".*"

Три копии ".*" - это три регулярных выражения, по порядку для conf, write и read. Запись ".*" означает полное разрешение.

Чтобы проверить успешность выполненных действий нужно включить веб-интерфейс.
Команда

sudo rabbitmq-plugins enable rabbitmq_management

Теперь по адресу http://localhost:15672/ доступна форма авторизации, зайдите в нее под данными созданного пользователя.
Соответственно, если сервер развернут не на локальной машине, то ip заменяется на нужный.

Это те самые минимальные команды для работы с rabbitMQ сервером, которые я хотел вам показать.

Ну и теперь посмотрим как подключить rabbitMQ как брокер для celery

Заменим настройку celeryconfig.py на

broker_url = 'amqp://myuser:mypassword@localhost:5672/example_host'

Где, myuser, mypassword и example_host мы создали командами выше

Теперь вы можете запустить воркеров все той же командой

$ celery --app=simple_examples.tasks:app worker -E -l INFO

И увидеть в строке transport строку

amqp://admin:**@localhost:5672/example_host

Более того в интерфейсе rabbitMQ по адресу http://localhost:15672/ мы увидим информацию о подключении к celery.

Что дальше?

Таким образом мы познакомились как можно использовать redis и rabbitMQ, разобрались что вообще такое брокеры, бэкенды и как их можно использовать, узнали как включить мониторинг и как вообще управлять задачами. Конечно, это не все, что можно было обсудить, как минимум не затронута тема использования Celery в связке с Django, хотя отличий от рассмотренного здесь почти ни каких нет, тем не менее отдельный материал по Celery + Django будет. В любом случае главная цель этого 'введения' в celery познакомить вас с общей идеей и возможностями этого инструмента, а для более углубленного изучения всегда существует документация.

Для отправки комментария необходимо авторизоваться



Комментарии

Здесь пока ничего нет...