Django i…rynsztok?

No dobra, może to nie rynsztok, ale kanał…i to nie taki o jakim myślicie! Pewnie nie raz mieliście taką sytuację używając Django, że jakaś operacja jest mocno czasochłonna. Przykładowo, generujecie jakiś raport do Excela, albo plik pdf. W większości takich przypadków dobrze jest to zlecić Celery aby taka operacja wykonywała się w tle. Ale pojawia się pytanie…jak powiadomić użytkownika, że zadanie które zlecił Celery zostało zakończone? Jest wiele opcji, ale oczywiście nie idziemy na łatwiznę wiec nie użyjemy zwykłych zapytań (long polling) czy SSE 😉 W tym wpisie pokażę Wam jak ogarnąć WebSockery w Django, dzięki Django Channels.

Scooby doo, gdzie jesteś?

Scoobiego nie ma ale za to będzie Daphne!Czyli nasz serwer ASGI którego użyjemy razem z django channels. Pozwoli nam on obsłużyć jednocześnie ruch HTTP oraz WebSockety.

Czego dokładnie użyjemy?

Skorzystamy z Celery, Redisa, Postgresa oraz Django 😉 Całość uruchomimy sobie w dockerze aby wszystko się łatwo „samo postawiło”.

Konfiguracja i kod

Na sam początek musimy zainstalować zależności. Później skonfigurować Django aby używało ASGI oraz Django Channels. W pliku konfiguracyjnym domyślnie mamy podaną konfigurację dla obsługi WSGI, musimy to zmienić na ASGI oraz dodać konfigurację Channels jak i dodać Daphne do listy naszych aplikacji. Ważne aby zrobić to na samej górze aby całość działała poprawnie. Daphne będzie teraz obsługiwała ruch nawet jeśli użyjemy domyślnego „runservera”. Konfiguracja będzie wyglądała mniej więcej tak:

Plik konfiguracyjny settings.py.

INSTALLED_APPS = [
    "daphne",
    ...
    "channels"
]

ASGI_APPLICATION = "config.asgi.application"

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {"hosts": [(os.getenv("REDIS_HOST", "redis"), 6379)]},
    }
}

Nasz plik asgi.py w którym zdefiniujemy jak ma być obsługiwany ruch HTTP oraz WebSockety.

import os

import django
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application

from .routing import websocket_urlpatterns

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
django.setup()

application = ProtocolTypeRouter(
    {
        "http": get_asgi_application(),
        "websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),
    }
)

Kolejnym krokiem będzie stworzenie konsumenta wiadomości. Zróbmy też jakieś proste zabezpieczenia aby użytkownik nie mógł się podłączyć do kanału innego użytkownika. W tym kroku definiujemy logikę połączeń oraz metodę której będziemy używać do wysyłania danych.

import json
from channels.generic.websocket import AsyncWebsocketConsumer


class NotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.user_id = int(self.scope["url_route"]["kwargs"]["user_id"])
        self.user = self.scope["user"]

        if not self.user.is_authenticated or self.user.id != self.user_id:
            await self.close()
            return

        self.group_name = f"user_{self.user_id}"
        await self.channel_layer.group_add(self.group_name, self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.group_name, self.channel_name)

    async def send_report_ready(self, event):
        await self.send(
            text_data=json.dumps(
                {
                    "message": "Raport jest gotowy do pobrania.",
                    "report_url": event["report_url"],
                }
            )
        )

Następnie plik z naszymi urlami dla websocketów (routing.py).

from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
    re_path(
        r"ws/notifications/(?P<user_id>\d+)/$", consumers.NotificationConsumer.as_asgi()
    ),
]

Zróbmy sobie prosty task który będzie udawał, czasochłonną operację (generowanie raportu). Załóżmy sobie testowo 5 sekund na operację a po jej zakończeniu prześlemy użytkownikowi link do raportu.

import time
from asgiref.sync import async_to_sync
from celery import shared_task
from channels.layers import get_channel_layer


@shared_task
def generate_report(user_id):
    time.sleep(5)
    report_url = f"/media/reports/report_{user_id}.pdf"

    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f"user_{user_id}",
        {
            "type": "send_report_ready",
            "report_url": report_url,
        },
    )

A na froncie użyjmy super prostego html który wyświetli nam „piękny” alert po zakończeniu taska 😉

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Example</title>
</head>
<body>
Test

<script>
  const userId = {{ user.id }};
  const socket = new WebSocket(`ws://${window.location.host}/ws/notifications/${userId}/`);

  socket.onmessage = function(e) {
    const data = JSON.parse(e.data);
    alert(data.message + " Link: " + data.report_url);
  }
</script>
</body>
</html>

A kod odpowiedzialny za ten widok będzie następujący. Na potrzeby tego artykułu załóżmy, że po prostu po wejściu na nasz home page, zadanie się uruchomi – tak wiem, jak wejdziemy wiele razy to uruchomimy wiele zadań 😉 Spokojnie, to jest tylko na potrzeby edukacyjne. Oczywiście użytkownik musi być zalogowany żeby całość działała!

from django.contrib.auth.mixins import LoginRequiredMixin
from django.views.generic import TemplateView
from website.tasks import generate_report


class HomePageView(LoginRequiredMixin, TemplateView):
    template_name = "website/home.html"

    def get(self, request, *args, **kwargs):
        generate_report.delay(request.user.id)

        return super().get(request, *args, **kwargs)

Podsumowanie

Całość zadziała w taki sposób, że po wejściu na adres i odczekaniu około 5 sekund dostaniemy alert z komunikatem oraz linkiem do naszego „raportu”. Wizualnie zdecydowanie szału nie ma, ale to tylko na potrzeby w miarę szybkiego pokazania jak można zaimplementować takie rozwiazanie. Jeśli chcecie sobie poeksperymentować, to wrzucam linka do repozytorium na githubie. Wystarczy odpalić dockera jednym poleceniem i całość powinna się uruchomić.

Zaawansowany sytem powiadomień 😉

Photo by Zoshua Colah on Unsplash