Python. Celery
Введение
Celery - ПО для работы с, так называемыми, задачами. Идея в том, что мы создаем объект Celery и декорируем им функции, которые планируем обрабатывать с помощью api Celery, это и есть наши задачи. Обычно все сводится к фоновой работе каких-то, долго исполняемых, задач. Наш синхронно работающий python и его синхронно работающий Django (или прочие фреймворки) исполняет задачи сверху вниз, поэтому если в очередь на исполнение встала задача, которая будет выполняться долго, все приложение перестанет отвечать пока эта задача не выполнится. Вы возможно замечали на каких-нибудь сайтах, как он после какого-то действия становится не отзывчив к нажатиям, а через время это проходит, так вот скорее всего вызвано это тем, что в это время выполняется какая-то не мгновенная задача. Celery призван решать эту проблему, посредством выполнения долго исполняемых задач в фоновом режиме.
Как я упомянул выше, объект Celery, которым мы декорируем какую-то функцию, превращает эту функцию в объект, который мы будем называть task.
Для того чтобы обрабатывать эти task'и, существуют другие объекты Celery, которые называются worker.
И последней компонент успешно работающего Celery - broker. broker выполняет роль распределителя задач между worker'ами. Для успешной работы broker должен быть активен, то есть broker поднимается, скажем как web-сервер, и пока он поднят он распределяет task'и между worker'ами и следит, чтобы вся эта схема работала. Результаты выполненных задач должны где-то хранится, и эту задачу на себя также берет broker (в celery мы будем называть это backend). Дело в том, что для хранения данных нужна какая-то база, но не такая база, которые мы привыкли видеть, а база типа NoSQL. И backend task'ов как раз на таких базах данных и построен. В качестве broker'ов выделяют redis и rabbitMQ, есть еще некоторые менее популярные, но будет достаточно знакомства с двумя. И redis и rabbitMQ могу выступать как и в качестве broker'а, так и в качестве backend'а, но rabbitMQ в качестве бэкенда не совсем самостоятелен, чаще используют связку rabbitMQ - broker, redis - backend. Как пишут сами разработчики, "redis более подвержен потере данных в случае внезапного завершения работы или сбоев питания", возможно именно по этому rabbitMQ предпочитают использовать в качестве broker'а чаще чем redis, ну и плюс потому что rabbitMQ поддерживает AMQP (Расширенный протокол очереди сообщений), обсудим этот момент подробнее позже. Но познакомимся мы в этом материале и с redis и с rabbitMQ.
Надеюсь картинка в вашей голове, пусть и размытая, сформирована и мы можем написать первый самый простой пример, который наглядно продемонстрирует все, что написано выше.
официальная документация celery
Демо. Task
Для материалов по celery я завел новую директорию, создал виртуальное окружение и добавил gitignore. Поэтому к моменту как вы это читаете все разобранные примеры должны быть на моем github в соответствующем репозитории.
Я рекомендую вам взять за правило использовать этот минимальный набор подготовительных работ. Польза такой структурированности ваших проектов по изолированным виртуальным окружениям и хранении их на github будет становиться все ощутимей по мере роста количества этих самых проектов.
Скорее всего вы все равно придете к этому, с моего совета, с чьего-то еще, самостоятельно, но придете. Так что, если вы еще так не делаете, советую прислушаться.
Для работы с celery его надо установить.
И заодно установим сразу и redis и rabbitMQ.
pip install celery
pip install rabbitmq
pip install redis
sudo apt-get install redis-server
Теперь создадим пустой файл и напишем там следующее
from celery import Celery import datetime app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost') @app.task def now_time(): return datetime.datetime.now()
app содержит объект Celery, состоящий в нашем случае из имени - 'tasks', брокера и бэкенда. На данный момент будем использовать redis для обеих сущностей.
Брокер должен содержать адрес, для redis можно использовать такой - 'redis://localhost:6379/0'. 6379 - стандартный redis port. 0 - NoSQL база данных, в redis они называются цифрами, от 0 и далее. redis://localhost - означает, что мы используем redis и крутиться он будет на локальном хосте - 127.0.0.1.
Бэкенд также содержит адрес, достаточно указать, имя и на каком устройстве он будет крутиться.
Теперь этим объектом мы можем декорировать произвольное число функций и к ним будут применимы методы Celery. Соответственно объектов Celery, при необходимости, мы тоже можем создать несколько, с разными брокерами и бэкендами.
Ну и сама функция просто возвращает текущие дату и время.
Попробуем перейти в консоль и поставить задачу в очередь на выполнение
>>> from simple_examples.celery_first_example import now_time >>> now_time.delay() <AsyncResult: 497d3763-3842-4f4b-9aa8-f4b229bd331e> >>> result = now_time.delay() >>> result.get() >>> result_2 = now_time.delay() >>> result_2.get()
Для отправки задачи в очередь существует метод .delay(), и если функция содержит аргументы то передаются они также в этот метод.
Возвращает .delay() объект AsyncResult.
К этому и объекту уже применимы методы для работы с задачей и этот объект мы и будем обрабатывать, для этого сохраним его в переменную.
Метод .get() используется для получения результата задачи и в данному случае мы должны были после применения этого метода увидеть дату и время, но не увидели. Создадим еще одну переменную, содержащую AsyncResult и применим .get() к ней, результата мы также не видим, но и никакими ошибками вызовы не завершаются.
В PyCharm - Python Console есть такой интерфейс как Show Command Queue
Выглядит он так. Мы видим, что первый вызов .get() встал в очередь и все последующие команды также встают в эту очередь. И случилось это потому что некому брать эти задачи в работу, нет тех самых воркеров, о которых я упоминал во вступлении. Давайте запустим воркеров
$ celery --app=simple_examples.celery_first_example:app worker
-------------- celery@tsarkovilya v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.15.0-47-generic-x86_64-with-glibc2.35 2022-09-16 19:58:33
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f612f2e1c60
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
Команда для запуска начинается с флага --app через знак равно указывается путь до файла, где лежит объект Celery, (app в нашем случае) через двоеточие имя объекта Celery и команда worker, который запустит воркеров.
После запуска команды вы уведите подобную системную информацию.
Самая верхняя строка - имя воркера, сформировано оно автоматически
Далее информация об устройстве
После config, включает он:
app - объект Celery
transport - broker
results - backend
concurrency - количество выделенных prefork процессов, которые и будут обрабатывать наши задачи
task events - логирование действий воркеров, нужно по большей степени при использовании мониторингов, пока мы об этом не говорили и данную функцию поэтому пока трогать не будем
queues - информация о запущенных очередях, по умолчанию используется одна очередь - celery.
Воркеры запущены, теперь можно вернуться в Python Console и посмотреть произошли-ли какие-нибудь изменения
...
>>> result_2.get()
'2022-09-16T19:58:34.708551'
'2022-09-16T19:58:34.734809'
Произошли. Все задачи, находящиеся в очереди, выполнились сразу, как только были запущены воркеры.
Вот на таком не хитром примере мы разобрались со скелетом Celery, далее будем говорить об отдельных возможностях этого инструмента подробнее.
Конфигурация
Разобравшись с логикой и идеей задач, становится понятно, что при внедрении Celery в ваши проекты, вы всегда будете использовать для работы некоторые настройки. Конечно, первым делом - настройки для брокера и для бэкенда, плюс присутствует еще ряд настроек, которые мы можем изменять. Как и в случае любой другой утилиты, которая так или иначе требует нашего участия в ее конфигурации, мы хотим вынести все эти настройки в отдельный файл, или как, например, в случае с Django в общий файл с настройками, где помимо самих настроек мы храним и наши конфигурационные константы.
И Celery разумеется не исключение, чаще вы будете видеть настройки брокера и прочего в отдельном файле, а не в качестве аргументов при объявлении объекта Celery.
Предлагает Celery для этого несколько вариантов, но самый предпочтительный это, конечно, вынесение всех настроек в отдельный файл.
У меня файл celery_first_example.py находится в папке simple_examples, создам в этой же папке файл celeryconfig.py со следующим содержанием
REDIS_HOST = 'redis://localhost' REDIS_PORT = '6379' REDIS_BD = '0' broker_url = REDIS_HOST + ':' + REDIS_PORT + '/' + REDIS_BD result_backend = REDIS_HOST task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Europe/Moscow' enable_utc = True
По большей части взяты настройки из документации, от себя я добавил объявление констант для редиса и формирование из них адреса брокера и бэкенда, именно в таком формате вы скорее всего будете сталкиваться с конфигурацией Celery.
Построчно тут особо не на чем заострять внимания, настройка брокера, бэкенда, обработка данных в формате json, времени и прочего. Со всеми этими настройками можно ознакомиться тут, там все подробно описано, поэтому не станем отдельно подробно на этом останавливаться.
Файл конфигурации пока не связана с объектом Celery. Вам могло показаться, что Celery сам будет искать файл с названием celeryconfig.py и брать от туда настройки, я тоже поначалу так думал, но на самом деле это условие совсем необязательное, можно использовать любое название для этого файла.
Подключим файл следующим образом.
from celery import Celery import datetime app = Celery('tasks') app.config_from_object('simple_examples.celeryconfig') @app.task(name='what-time-is-it') def now_time(): return datetime.datetime.now() @app.task(name='what-amount') def amount(x, y): return x + y
Уберем из аргументов app настройки написанные ранее, оставим только имя, а ниже воспользуемся методом .config_from_object(), в качестве аргумента метод принимает путь до файла с конфигурацией.
Плюс я добавил еще одну задачу, просто для демонстрации, как объявить несколько задач, как видно, все максимально очевидно.
И добавил аргумента name для каждой задачи, задачи имеют рад настраиваемых аргументов, позже посмотрим на некоторые. Имя каждой задачи должно быть уникальным и если не задавать его вручную оно сформируется автоматически.
Теперь можно запустить воркеров и убедиться, что файл конфигурации успешно привязан к объекту Celery, если это не так вы увидите сообщение об ошибке.
Если ошибок вы не увидели, но все равно сомневаетесь, что ваша конфигурация применена, попробуйте изменить какую-нибудь настройку, например допустить ошибку в настройке для брокера и посмотреть на изменения.
Запускать воркеров можно с параметрами, все их можно увидеть по команде celery worker --help, например, воспользуемся флагом --loglevel, сокращенно -l и укажем явно, что нас интересует логирование уровня INFO. Также добавим флаг -E, который активирует --task-events, но как упоминалось выше без мониторинга для нас этот флаг ничего не изменит.
$ celery --app=simple_examples.celery_first_example:app worker -E --loglevel INFO
-------------- celery@tsarkovilya v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.15.0-47-generic-x86_64-with-glibc2.35 2022-09-18 16:17:53
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f9043a11f90
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: ON
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. what-amount
. what-time-is-it
[2022-09-18 16:17:53,850: INFO/MainProcess] Connected to redis://localhost:6379/0
[2022-09-18 16:17:53,854: INFO/MainProcess] mingle: searching for neighbors
[2022-09-18 16:17:54,862: INFO/MainProcess] mingle: all alone
[2022-09-18 16:17:54,870: INFO/MainProcess] celery@tsarkovilya ready.
Как видно с флагом для логирования мы сразу видим дополнительную информацию. Давайте запустим несколько задач и посмотрим какую информацию мы будем при этом логировать.
>>> from simple_examples.celery_first_example import * >>> res = now_time.delay() >>> other_res = amount.delay(10, 11) >>> res.get() '2022-09-18T16:21:38.329214' >>> other_res.get() 21
... [2022-09-18 16:17:54,870: INFO/MainProcess] celery@tsarkovilya ready. [2022-09-18 16:21:38,327: INFO/MainProcess] Task what-time-is-it[c5bca45f-9e10-4b8a-8ce2-7a897e935046] received [2022-09-18 16:21:38,334: INFO/ForkPoolWorker-8] Task what-time-is-it[c5bca45f-9e10-4b8a-8ce2-7a897e935046] succeeded in 0.00525428900073166s: datetime.datetime(2022, 9, 18, 16, 21, 38, 329214) [2022-09-18 16:21:42,839: INFO/MainProcess] Task what-amount[c0cf428b-a4ab-41b3-8d0c-a250654fa99b] received [2022-09-18 16:21:42,842: INFO/ForkPoolWorker-8] Task what-amount[c0cf428b-a4ab-41b3-8d0c-a250654fa99b] succeeded in 0.0007629800002177944s: 21
Мы получаем информацию о том какая задача была выполнена, во сколько, за сколько и каков результат ее исполнения. И можно также увидеть, что результат записывается в базу сразу после выполнения задачи, а не при использовании метода .get().
Все прекрасно работает, а значит и наши настройки конфигурации успешно применены.
Periodic Tasks
Иногда может оказаться необходимым исполнять задачи в определенное время, раз в час, раз в месяц, первый день каждого квартала и так далее. Celery позволяет достаточно просто это реализовать. Для примера давайте напишем программу немного поинтереснее, чем писали выше, допустим такую
import celery import random import string import datetime import psycopg2 # Модуль для работы с исполнением задач по расписанию from celery.schedules import crontab # Конфигурация БД connection = psycopg2.connect( host='127.0.0.1', database='celery_example_db', user='(имя пользователя)', password='(пароль от PgAdmin)', # Классический PostgreSQL порт port=5432, ) # Автоматическое применение изменений connection.autocommit = True # Объект Celery с импортированными настройками конфигурации app = celery.Celery('db_work') app.config_from_object('simple_examples.celeryconfig') # Конфигурация отправки по времени app.conf.beat_schedule = { 'add_note_to_db': { # Выполнение задачи раз в минуту 'task': 'insert-into-table-task', 'schedule': crontab(minute='*/1'), }, } @app.task(name='create-table-task') def create_table(): """Задача на создание таблицы в БД""" # Вывод информации о версии postgreSQL и ПК with connection.cursor() as cursor: # Объект курсора устанавливает соединение с БД # Метод .execute() для SQL-запроса cursor.execute( "SELECT version();" ) # Метод .fetchone() возвращает результат работы .execute() print(f'Версия - {cursor.fetchone()}') # Создание новой таблицы с полями id и log_info with connection.cursor() as cursor: cursor.execute( """CREATE TABLE results( id serial PRIMARY KEY, log_info text NOT NULL);""" ) print('Таблица results создана') @app.task(name='insert-into-table-task') def insert_into_table(): """Задача для добавления новой записи в таблицу""" # Генерация случайно строки из цифр и букв letters_and_digits = string.ascii_letters + string.digits some_string = ''.join(random.choice(letters_and_digits) for i in range(random.randint(30, 50))) # Добавление сгенерированной строки в таблицу results with connection.cursor() as cursor: cursor.execute( "INSERT INTO results (log_info) VALUES(%s);", [some_string]) print(f'Запись {some_string} добавлена в таблицу results') # Генерация результата, возвращаемого задачей res = f'{some_string} добавлена в {datetime.datetime.now()}' return res
Создадим БД в postgreSQL и с помощью библиотеки psycopg2 подключимся к ней.
документация psycopg2.
Для установки psycopg2 используйте команду
pip install psycopg2-binary
Подключаемся к БД через .connect(), куда указываем все данные от нашей БД и включаем .autocommit = True. Таким образом все изменения над базой будут применяться без дополнительного подтверждения.
Далее создаем объект Celery, конфигурация точно такая же, как использовалась в предыдущем разделе.
Конфигурацию отправки по времени рассмотрим в последнюю очередь.
Создадим две задачи, первая на создание таблицы, которая будет состоять всего из двух полей, вторая - для добавления случайно сгенерированной последовательности символов в эту таблицу. Методом .cursor() мы активируем соединение с БД и внутри метода .execute() можем писать SQL-запросы классическим SQL-синтаксисом. Ну и поскольку соединение нужно закрывать для сохранения данных и изменений используем менеджер контекста.
Задача по созданию таблицы и выводе служебной информации о БД и ПК не будет ничего возвращать, а задача на добавление новой записи будет возвращать саму сгенерированную запись и время, когда это произошло.
Сначала поднимем воркеров и попробуем выполнить эти задачи через Python Console.
$ celery --app=simple_examples.tasks:app worker -l INFO
-------------- celery@tsarkovilya v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.15.0-48-generic-x86_64-with-glibc2.35 2022-09-25 12:54:44
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: db_work:0x7f7e70a9c160
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. create-table-task
. insert-into-table-task
[2022-09-25 12:54:44,809: INFO/MainProcess] Connected to redis://localhost:6379/0
[2022-09-25 12:54:44,812: INFO/MainProcess] mingle: searching for neighbors
[2022-09-25 12:54:45,821: INFO/MainProcess] mingle: all alone
[2022-09-25 12:54:45,841: INFO/MainProcess] celery@tsarkovilya ready.
>>> from simple_examples.tasks import * >>> create_table.delay() <AsyncResult: c43c5881-4e72-4a60-95ed-181533a3ffc2> >>> insert_into_table.delay() <AsyncResult: 52b32039-eb16-42b1-85c7-55791a4fee59>
Запустим воркеров с флагом -l INFO, чтобы просматривать информацию об исполняемых задачах. И запустим создание таблицы и добавление записи.
После этого вернемся в терминал и посмотрим информацию
... [2022-09-25 12:55:53,308: INFO/MainProcess] Task create-table-task[c43c5881-4e72-4a60-95ed-181533a3ffc2] received [2022-09-25 12:55:53,310: WARNING/ForkPoolWorker-7] Версия - ('PostgreSQL 12.11 (Ubuntu 12.11-0ubuntu0.20.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0, 64-bit',) [2022-09-25 12:55:53,323: WARNING/ForkPoolWorker-7] Таблица results создана [2022-09-25 12:55:53,328: INFO/ForkPoolWorker-7] Task create-table-task[c43c5881-4e72-4a60-95ed-181533a3ffc2] succeeded in 0.018422689999852082s: None [2022-09-25 12:56:01,091: INFO/MainProcess] Task insert-into-table-task[52b32039-eb16-42b1-85c7-55791a4fee59] received [2022-09-25 12:56:01,103: WARNING/ForkPoolWorker-7] Запись kROEX1URp5jKc0b4SNAO3kTiPD7ev5wLKvRKjvsbxjp добавлена в таблицу results [2022-09-25 12:56:01,105: INFO/ForkPoolWorker-7] Task insert-into-table-task[52b32039-eb16-42b1-85c7-55791a4fee59] succeeded in 0.012491166000017984s: 'kROEX1URp5jKc0b4SNAO3kTiPD7ev5wLKvRKjvsbxjp добавлена в 2022-09-25 12:56:01.103840'
Как видно все отлично работает. Если открыть pgAdmin мы увидим созданную таблицу result и добавленную в нее запись.
Теперь что касается периодических задач. У меня уже написана конфигурация дла выполнения второй задачи раз в минуту. И при текущем запуске я не комментировал эту конфигурацию, запустил все как есть, тем не менее, раз в минуту никакого выполнения задачи не происходит. Перед тем как запустит выполнение давайте взглянем на настройку.
Конфигурация периодичности записывается в методе .beat_schedule в виде словаря, каждая новая запись этого словаря это пара состоящая из имени, которе мы придумываем сами и параметров также в виде словаря. В качестве параметров в самом простом варианте выступает сам task и schedule(расписание) для task. В task мы передаем имя задачи из параметра name, а не имя функцию, которую мы декорируем.
Для настройки расписания используем celery crontab. Вообще cron это как раз и есть утилита для временного исполнения задач, это какая-то уникальная celery утилита, это утилита unix-систем. И используется она, конечно, не только в python. Подробно с примера использования crontab в celery для разных временных промежутков можно найти в этом разделе документации.
Запись crontab(minute='*/1') означает выполнять задачу каждую минуту. Минута это минимально возможное время периодичности, поэтому запись crontab(minute='*/1') можно заменить на crontab() и это будет означать то же самое.
Запускаются периодические задачи похожим образом с воркерами, используем тот же путь, только команда worker заменятся на beat.
Ну и открыть все это дело нужно в другом терминале, поскольку в первом уже запущены воркеры.
$ celery --app=simple_examples.tasks:app beat -l INFO
celery beat v5.2.7 (dawn-chorus) is starting. __ - ... __ - _ LocalTime -> 2022-09-25 13:19:46 Configuration -> . broker -> redis://localhost:6379/0 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%INFO . maxinterval -> 5.00 minutes (300s) [2022-09-25 13:19:46,878: INFO/MainProcess] beat: Starting... [2022-09-25 13:19:46,906: INFO/MainProcess] Scheduler: Sending due task add_note_to_db (insert-into-table-task) [2022-09-25 13:20:00,002: INFO/MainProcess] Scheduler: Sending due task add_note_to_db (insert-into-table-task) [2022-09-25 13:21:00,050: INFO/MainProcess] Scheduler: Sending due task add_note_to_db (insert-into-table-task)
... [2022-09-25 13:19:46,932: INFO/MainProcess] Task insert-into-table-task[b5894245-3ea2-4f50-9bd8-b5826d10ccf5] received [2022-09-25 13:19:46,933: WARNING/ForkPoolWorker-7] Запись TgQYBCWlBSrduKnCVv7f4vClVTjSuK2Wa8aJLGdIq67L4fY8 добавлена в таблицу results [2022-09-25 13:19:46,934: INFO/ForkPoolWorker-7] Task insert-into-table-task[b5894245-3ea2-4f50-9bd8-b5826d10ccf5] succeeded in 0.0015785430000505585s: 'TgQYBCWlBSrduKnCVv7f4vClVTjSuK2Wa8aJLGdIq67L4fY8 добавлена в 2022-09-25 13:19:46.934026' [2022-09-25 13:20:00,005: INFO/MainProcess] Task insert-into-table-task[5ae22c7a-1067-4a06-902f-1512ef0fa4d1] received [2022-09-25 13:20:00,016: WARNING/ForkPoolWorker-7] Запись bwnfTvC8u97MiS6FVGX1Aq6LeK7wechoJv48jT3KFLQC добавлена в таблицу results [2022-09-25 13:20:00,017: INFO/ForkPoolWorker-7] Task insert-into-table-task[5ae22c7a-1067-4a06-902f-1512ef0fa4d1] succeeded in 0.010723345999849698s: 'bwnfTvC8u97MiS6FVGX1Aq6LeK7wechoJv48jT3KFLQC добавлена в 2022-09-25 13:20:00.016937' [2022-09-25 13:21:00,053: INFO/MainProcess] Task insert-into-table-task[a079365b-2db1-4bc9-ad4c-1016f99a8cfc] received [2022-09-25 13:21:00,063: WARNING/ForkPoolWorker-7] Запись DkvkKsSQqhQu5T5oMMRqtKptNJoWF7hB7w60 добавлена в таблицу results [2022-09-25 13:21:00,065: INFO/ForkPoolWorker-7] Task insert-into-table-task[a079365b-2db1-4bc9-ad4c-1016f99a8cfc] succeeded in 0.01149353400023756s: 'DkvkKsSQqhQu5T5oMMRqtKptNJoWF7hB7w60 добавлена в 2022-09-25 13:21:00.064120'
При запуске beat мы видим некоторую служебную информацию и далее каждую минуту видим запись о том, что задача отправлена на исполнение. Обратите внимание первая задача запустилась сразу при запуске в 13:19:46, а вторая через 14 секунд ровно в 13:20:00 и последующие задачи запускаются уже в 00 каждой минуты.
В первом терминале мы видим информацию о выполнении каждой задачи, таким образом мы достаточно просто реализовали автоматическое ежеминутное добавление записи в БД.
Если вам вдруг понадобится выполнять задачи чаще чем раз в минут, то crontab вам не подойдет, вы можете использовать, например, .timedelta() и datetime, куда передадите количество секунд и в аком случае все тоже будет работать.
'schedule': datetime.timedelta(seconds=10),
flower
Каждый раз смотреть в терминал для получения информации о выполнении задач не очень удобно, особенно, когда задач не 1-2, а гораздо больше. У Celery есть инструмент для мониторинга - Flower.
Имеет отдельную документацию
Установка
pip install flower
Запуск командой
celery flower --port=5566
Но если запустить его просто так, то по адресу http://localhost:5566/ мы, конечно, увидим интерфейс flower, но он не будет ни к чему подключен.
Для запуска flower нужен еще один терминал и команда flower
$ celery --app=simple_examples.tasks:app worker -E -l INFO
-------------- celery@tsarkovilya v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.15.0-48-generic-x86_64-with-glibc2.35 2022-09-25 15:29:55
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: db_work:0x7f563cbb1e40
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: ON
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. create-table-task
. insert-into-table-task
[2022-09-25 15:29:56,059: INFO/MainProcess] Connected to redis://localhost:6379/0
[2022-09-25 15:29:56,063: INFO/MainProcess] mingle: searching for neighbors
[2022-09-25 15:29:57,070: INFO/MainProcess] mingle: all alone
[2022-09-25 15:29:57,079: INFO/MainProcess] celery@tsarkovilya ready.
$ celery --app=simple_examples.tasks:app beat -l INFO
celery beat v5.2.7 (dawn-chorus) is starting. __ - ... __ - _ LocalTime -> 2022-09-25 15:29:59 Configuration -> . broker -> redis://localhost:6379/0 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%INFO . maxinterval -> 5.00 minutes (300s) [2022-09-25 15:29:59,537: INFO/MainProcess] beat: Starting... [2022-09-25 15:29:59,557: INFO/MainProcess] Scheduler: Sending due task add_note_to_db (insert-into-table-task)
$ celery --app=simple_examples.tasks:app flower --address=127.0.0.1 --port=5566
[I 220925 15:29:21 command:162] Visit me at http://127.0.0.1:5566 [I 220925 15:29:21 command:170] Broker: redis://localhost:6379/0 [I 220925 15:29:21 command:171] Registered tasks: ['celery.accumulate', 'celery.backend_cleanup', 'celery.chain', 'celery.chord', 'celery.chord_unlock', 'celery.chunks', 'celery.group', 'celery.map', 'celery.starmap', 'create-table-task', 'insert-into-table-task'] [W 220925 15:29:21 command:177] Running without authentication [I 220925 15:29:21 mixins:225] Connected to redis://localhost:6379/0
Таким образом запуск воркеров, периодические задачи и flower будет выглядеть следующим образом
По адресу http://127.0.0.1:5566 мы теперь видим интерфейс flower подключенный к redis.
Например, на вкладе с задачами мы теперь видим подробную информацию о каждой задаче. Более того, в каждую из них можно перейти и увидеть еще более расширенную информацию.
rabbitMQ как брокер
rabbitMQ, как и redis, - инструмент, который может работать самостоятельно, и использование его в связке с celery необязательное условие. По этой причине более детальные о rabbitMQ и redis мы поговорим в материалах, посвященным этим инструментам. Сейчас же мы просто посмотрим на минимальные необходимые настройки, которые нужны для использования rabbitMQ как брокера в связке с celery и посмотрим на минимальный набор команд для работы с rabbitMQ.
На этой странице официальной документации можно ознакомится с установкой под разные ОС.
Установим сервер rabbitMQ на машину с linux (kubuntu)
sudo apt-get install rabbitmq-server
Сервер rabbitMQ можно запускать и останавливать
Команды
systemctl start rabbitmq-server
systemctl enable rabbitmq-server
соответственно.
А для просмотра состояния сервера используйте
systemctl status rabbitmq-server
Если в ответ на status вы увидели состояние active, то все работает правильно.
Для управления сервером нужны пользователи.
Создается пользователь командой
sudo rabbitmqctl add_user myuser mypassword
myuser и mypassword вы придумываете сами.
Для просмотра списка пользователей используется команда
sudo rabbitmqctl list_users
А для удаления
sudo rabbitmqctl delete_user myuser
Соответственно myuser имя удаляемого пользователя из списка пользователей, в ответ на delete_user мы получим имя удаленного пользователя.
Далее, мы можем создать виртуальный хост и дать пользователю права на него.
Создается хост командой, имя хоста задаем сами
sudo rabbitmqctl add_vhost example_host
Список хостов можно увидеть по команде.
sudo rabbitmqctl list_vhosts
Интересующего нас пользователя мы можем тегировать как админа командой.
rabbitmqctl set_user_tags myuser administrator
И выдать этому пользователю права на это хостинг
sudo rabbitmqctl set_permissions -p example_host myuser ".*" ".*" ".*"
Три копии ".*" - это три регулярных выражения, по порядку для conf, write и read. Запись ".*" означает полное разрешение.
Чтобы проверить успешность выполненных действий нужно включить веб-интерфейс.
Команда
sudo rabbitmq-plugins enable rabbitmq_management
Теперь по адресу http://localhost:15672/ доступна форма авторизации, зайдите в нее под данными созданного пользователя.
Соответственно, если сервер развернут не на локальной машине, то ip заменяется на нужный.
Это те самые минимальные команды для работы с rabbitMQ сервером, которые я хотел вам показать.
Ну и теперь посмотрим как подключить rabbitMQ как брокер для celery
Заменим настройку celeryconfig.py на
broker_url = 'amqp://myuser:mypassword@localhost:5672/example_host'
Где, myuser, mypassword и example_host мы создали командами выше
Теперь вы можете запустить воркеров все той же командой
$ celery --app=simple_examples.tasks:app worker -E -l INFO
И увидеть в строке transport строку
amqp://admin:**@localhost:5672/example_host
Более того в интерфейсе rabbitMQ по адресу http://localhost:15672/ мы увидим информацию о подключении к celery.
Что дальше?
Таким образом мы познакомились как можно использовать redis и rabbitMQ, разобрались что вообще такое брокеры, бэкенды и как их можно использовать, узнали как включить мониторинг и как вообще управлять задачами. Конечно, это не все, что можно было обсудить, как минимум не затронута тема использования Celery в связке с Django, хотя отличий от рассмотренного здесь почти ни каких нет, тем не менее отдельный материал по Celery + Django будет. В любом случае главная цель этого 'введения' в celery познакомить вас с общей идеей и возможностями этого инструмента, а для более углубленного изучения всегда существует документация.
Для отправки комментария необходимо авторизоваться
Комментарии
Здесь пока ничего нет...