W poprzednim poście skonfigurowaliśmy Celery w projekcie Django i stworzyliśmy prosty przykład zadania asynchronicznego. Teraz przejdziemy do bardziej zaawansowanych i praktycznych aspektów pracy z Celery. Omówimy, jak tworzyć różne rodzaje zadań, zarządzać wynikami, radzić sobie z błędami oraz jak efektywnie korzystać z Celery jako zamiennika dla crona dzięki Celery Beat. Wszystko to wzbogacimy o konkretne przykłady w Django.
Agenda
- Tworzenie i wykonywanie zadań
- Definiowanie zadań (@shared_task)
- Uruchamianie zadań (delay(), apply_async())
- Obsługa wyników zadań
- Niezawodność w Celery
- Automatyczne ponowne próby
- Idempotentność zadań
- Przykład idempotentnej wysyłki e-maila
- Najlepsze praktyki przy pracy z Celery
- Monitorowanie i logowanie (Flower)
- Obsługa błędów i wyjątków
- Debugowanie zadań asynchronicznych
- Periodic Tasks – Celery jako zamiennik crona
- Instalacja Celery Beat
- Konfiguracja harmonogramu
- Programowe dodawanie zadań okresowych
- Podsumowanie i zapowiedź kolejnego posta
Tworzenie i wykonywanie zadań
Definiowanie zadań (@shared_task)
Zadania w Celery definiuje się jako funkcje Pythonowe. Najczęściej używany jest dekorator @shared_task, który pozwala na współdzielenie zadania między różnymi aplikacjami Django. Przykład prostego zadania:
from celery import shared_task
@shared_task
def add(x, y):
"""Proste zadanie dodające dwie liczby."""
return x + yMożesz również użyć dekoratora @app.task, jeśli chcesz bardziej precyzyjnie zarządzać zadaniami w kontekście konkretnej instancji Celery.
Uruchamianie zadań (delay(), apply_async())
Celery oferuje dwa główne sposoby uruchamiania zadań:
delay()
Najprostszy sposób na dodanie zadania do kolejki:result = add.delay(4, 6)To wywołanie natychmiast dodaje zadanie do brokera, a worker zajmuje się jego przetwarzaniem. W tle.
apply_async()
Umożliwia bardziej zaawansowane opcje, takie jak ustawienie opóźnienia czy priorytetów:result = add.apply_async((4, 6), countdown=10) # Uruchomienie zadania po 10 sekundach
Obsługa wyników zadań
Każde zadanie zwraca obiekt AsyncResult, który pozwala monitorować status i wynik zadania. Oto przykład widoku Django do sprawdzania statusu i wyniku:
from django.http import JsonResponse
from celery.result import AsyncResult
def task_status_view(request, task_id):
"""Widok do sprawdzania statusu zadania."""
result = AsyncResult(task_id)
response = {"task_id": task_id, "status": result.status}
if result.ready(): # Jeśli zadanie zostało zakończone
response["result"] = result.get() # Pobranie wyniku
return JsonResponse(response)Niezawodność w Celery
Dzięki obsłudze automatycznych ponownych prób i dokładnemu logowaniu przebiegu zadań, Celery jest narzędziem stabilnym i przewidywalnym.
Weźmy sobie prosty przykład. Nasza aplikacja wysyła maila do użytkownika z potwierdzeniem operacji zakupu. Jest to bardzo ważny email, zawiera informacje o kwocie i zamówionych produktach. Użytkownik musi go otrzymac. Wysyłamy go w tle, żeby nie trzymać użytkownika w nieskończoność na stronie. Przepływ wyglada tak (kliknij grafikę, aby powiększyć):
A co może pójść nie tak?
- serwer pocztowy może przechodzić okresową konserwacje/mieć awarie
- mogą być przejściowe problemy z siecią - brak połączenia z serwerem siecowym
- przeciążenie serwera poczty
- problemy z DNS – np. chwilowa niezdolność do rozwiązania nazwy domeny serwera poczty.
- problemy z certyfikatem serwera poczty
Powyższa lista to tylko przykłady. Powodów problemu może być więcej.
Wbudowany w Celery mechanizm ponawiania pozwala na próbowanie ponownie n razy, nim stwierdzi, że operacja nie ma szans powodzenia. Np. ten email możemy próbować wysłać 60 razy co minute. Jeśli jest to problem przejściowy - jest spora szansa, że w godzine będzie rozwiązany. Jeśli nie, no cóż, admin będzie miał robotę. Natomiast nasz admin będzie zdecydowanie bardziej zadowolony z zadania padającego po 60. ponowieniu, niż od razu, przy pierwszej próbie.
To wszystko brzmi pięknie, natomiast jest haczyk.
Żeby bezpiecznie korzystać z retry trzeba zwrócić uwagę, by zadania były "idempotentne". A co to znaczy? Posłużę się tutaj definicją:
Termin "idempotentny" odnosi się do właściwości operacji lub funkcji,
która pozwala na jej wielokrotne wykonanie bez zmiany końcowego wyniku po pierwszym zastosowaniu.
Innymi słowy, niezależnie od tego, ile razy wykonamy daną operację idempotentną, rezultat zawsze będzie taki sam jak po pierwszym wykonaniu.
ref: https://sjp.pl/idempotentny, https://stormit.pl/idempotent/
Przykłady operacji idempotentnych w informatyce:
- Bazy danych: Operacje takie, jak usuwanie rekordów czy aktualizacja danych są idempotentne. Wyjaśnienie: usuwanie zwłaszcza jednego rekordu nic więcej nie zmieni. Aktualizacja zawsze wpisze dane do bazy.
- REST API: Metody HTTP takie jak GET, PUT, DELETE są idempotentne, co oznacza, że wielokrotne wywołanie tych samych żądań nie zmienia stanu systemu po pierwszym wykonaniu. Wyjaśnienie: Podobnie jak w bazie danych: pobieranie danych (GET) nic nie zmienia, PUT to aktualizacja, a DELETE - jak wcześniej.
Wracając do naszego przykładu: czy wysyłka maila jest operacją idempotentną? Nie. Wywołamy ten mail.send() i nasz odbiorca dostanie 60 maili, bo było 60 prób ponownych (np. przez błąd w kodzie z naszej strony). Natomiast, w dość prosty sposób, możemy przerobić wysyłkę maili na operacje idempotentne:
- tworzymy zadanie wysłania maila i zapisujemy je w bazie danych
- do celery przekazujemy identyfikator tego zadania
- celery, podczas pracy, zapisuje stan zadania
- przygotowanie do wysyłki (renderowanie szablonu)
- połączenie z serwerem SMTP (pocztowym)
- potwierdzenie od serwera, że wysłano
- i kończy sukcesem
Teraz, w zależności od tego gdzie zadanie padło, możemy zdecydować o ponowieniu lub nie wysyłki. Jeśli problem był przy połączeniu z serwerem SMTP, to możemy śmiało ponawiać. Jeśli problem był z ostatnim zapisem do bazy, by oznaczyć żądanie wysyłki jako "sukces" to lepiej nie ponawiać - email już został wysłany, to nasz serwer bazy danych ma problem.
Najlepsze praktyki przy pracy z Celery
Monitorowanie i logowanie
Monitorowanie pracy workerów jest kluczowe dla utrzymania stabilności systemu. Narzędzie Flower umożliwia wizualizację statusu workerów oraz monitorowanie kolejek:
pip install flower
celery -A celery_blog flowerFlower działa pod adresem http://localhost:5555 (klinknij grafikę, aby powiększyć).
Obsługa błędów i wyjątków
Zawsze zabezpieczaj swoje zadania przed błędami za pomocą bloków try-except. Możesz także używać opcji automatycznych ponownych prób:
@shared_task(bind=True, max_retries=3)
def example_task(self):
try:
# Kod zadania
pass
except Exception as exc:
raise self.retry(exc=exc, countdown=5) # Ponowna próba po 5 sekundachNim użyjesz retry, przeczytaj sekcje "Niezawodność w Celery" powyżej.
Debugowanie
Debugowanie kodu wykonywanego w tle jest trudne. Zostaje przeglądanie logów, co nie zawsze pokazuje problem.
Rozwiązanie, które ja stosuje to jest:
- zapis zadania w bazie danych
- przekazanie identyfikatora zadania do Celery
- Celery, podczas pracy, dodaje do zadania log, więc w każdym momencie wiem, gdzie jestem.
Wygląda to tak:
# models.py Django
class XYZ(models.Model):
log = models.TextField(default="")
# pomijam pozostale pola
def _add_log(self, text):
"""
Metoda logująca - aktualizuje obiekt
"""
if isinstance(text, bytes):
text = text.decode("utf-8")
# dodaj `text` do już istniejącego logu
self.log += f"{datetime.now()} :: {text}\n"
# - znajdz bieżący obiekt
# - zaktualizuj tylko pole `log`
XYZ.objects.filter(id=self.id).update(log=self.log)Dzięki takiemu podejściu możesz zobaczyć logi zadania bezpośrednio w panelu admina. Tutaj baza może mocno rosnąć z uwagi na te logi, ale... można je po jakimś czasie automatycznie usuwać.
Periodic Tasks – Celery jako zamiennik crona
Jednym z najczęstszych zastosowań Celery jest wykonywanie okresowych zadań – podobnie jak cron w systemach Unixowych. Dzięki dodatkowi Celery Beat możesz łatwo zaplanować harmonogram dla swoich zadań.
Instalacja Celery Beat
Najpierw zainstaluj pakiet:
pip install django-celery-beatDodaj aplikację do INSTALLED_APPS w pliku settings.py:
INSTALLED_APPS += ['django_celery_beat']Następnie wykonaj migracje:
python manage.py migrate django_celery_beatW efekcie w adminie mamy nową sekcje "Periodic Tasks"
Konfiguracja harmonogramu
Po instalacji Celery Beat możesz zarządzać okresowymi zadaniami za pomocą panelu administracyjnego Django. Oto przykład:
- W panelu admina znajdź sekcję "Periodic Tasks".
- Utwórz nowe zadanie okresowe (np. codzienny raport):
- Wybierz nazwę swojego zadania (
tasks.example_task- to zadanie zdefiniowaliśmy w poprzednim wpisie.). - Ustaw interwał (np. codziennie o godzinie 8:00).
- Wybierz nazwę swojego zadania (
Kliknij grafikę, aby powiększyć:
![]()
Jak widać, mamy bardzo wiele opcji uruchamiania tych zadań. Tutaj stworzyliśmy zadanie uruchamiane codziennie o 8 rano. Jak w cronie.
Przykład kodu dla okresowego zadania
Jeśli chcesz dodać harmonogram programowo (bez panelu admina), oto przykład:
from django_celery_beat.models import PeriodicTask, IntervalSchedule
import json
# Dodaj interwał (np. co godzinę)
schedule, created = IntervalSchedule.objects.get_or_create(
every=1,
period=IntervalSchedule.HOURS,
)
# Dodaj okresowe zadanie
PeriodicTask.objects.create(
interval=schedule,
name='Uruchomienie example_task co godzinę',
task='myapp.tasks.example_task',
args=json.dumps([10]) # Argumenty dla zadania (np. czas trwania)
)Kliknij grafikę, aby powiększyć:
![]()
Teraz Twoje okresowe zadanie będzie automatycznie uruchamiane zgodnie z harmonogramem! Natomiast... Celery nie jest potrzebne do obsługi zadań okresowych. Można to rozwiązać na kilka innych sposobów, które omówię w kolejnym poście. Problem z zadaniami w adminie jest taki, że osoby z dostępem do admina mogą modyfikować harmonogram. A to może nie być dobry pomysł - mogą namieszać.
Podsumowanie
W tym poście zagłębiłem się w zaawansowane aspekty pracy z Celery w Django. Omówiłem tworzenie i wykonywanie zadań, wykorzystując dekoratory @shared_task i metody delay() oraz apply_async(). Zwróciłem szczególną uwagę na kluczową kwestię niezawodności, podkreślając znaczenie idempotentności zadań przy korzystaniu z mechanizmu automatycznych ponownych prób.
Przedstawiłem najlepsze praktyki, takie jak monitorowanie z użyciem Flower i efektywne debugowanie zadań asynchronicznych. Zaprezentowałem również Celery Beat, jako potężne narzędzie do planowania zadań okresowych, zastępujące tradycyjnego crona.
Pamiętajcie, że choć Celery oferuje wiele możliwości, nie jest jedynym rozwiązaniem dla zadań okresowych. W kolejnym poście omówię alternatywne podejścia do tego zagadnienia.
Kod projektu, który przygotowałem, jest dostępny na moim GitHubie: https://github.com/pymasterspl/celery_blog
W kolejnym poście omówimy alternatywy dla Celery i kiedy je stosować. Już w lutym!
Dołącz do społeczności PyMasters, gdzie możesz wziąć udział w ciekawych projektach i podnosić swoje umiejętności w Pythonie i Django.
Pobierz mojego bezpłanego ebooka: WARSZTAT JUNIORA Przewodnik po kluczowych kompetencjach i narzędziach dla początkującego programisty Pythona
Zapraszam do zadawania pytań przez formularz kontaktowy. Pamiętaj, że jeśli potrzebujesz wsparcia, możesz napisać do mnie - pomogę.
Spodobał Ci się post?
Podziel się nim!
Masz uwagi do posta, chcesz porozmawiać, szukasz pomocy z Pythonem i Django? Napisz do mnie!