Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,9 @@ back-poetry-tree: ## show dependencies as a tree
@$(COMPOSE) run --rm --build backend-dev pipdeptree
.PHONY: back-poetry-tree

pip-audit: ## check the dependencies
@$(COMPOSE) run --rm --no-deps -e HOME=/tmp --build backend-dev pip-audit
pip-audit: ## check the dependencies
# https://github.com/pypa/pip/issues/13607
@$(COMPOSE) run --rm --no-deps -e HOME=/tmp --build backend-dev pip-audit --ignore-vuln GHSA-4xh5-x5gv-qwph
.PHONY: pip-audit

collectstatic: ## collect static files
Expand Down
3 changes: 2 additions & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
web: bin/scalingo_run_web
worker: celery -A messages.celery_app worker --task-events --beat -l INFO -c $CELERY_CONCURRENCY -Q celery,default
worker: python manage.py worker
scheduler: python manage.py crontab
postdeploy: python manage.py migrate
30 changes: 9 additions & 21 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ services:
target: poetry
pull_policy: build

celery-dev:
worker-dev:
build:
context: src/backend
target: runtime-dev
args:
DOCKER_USER: ${DOCKER_USER:-1000}
user: ${DOCKER_USER:-1000}
command: ["celery", "-A", "messages.celery_app", "worker", "-l", "DEBUG", "-Q", "celery,default"]
command: ["python", "manage.py", "worker", "-p", "2", "-t", "2"]
environment:
- DJANGO_CONFIGURATION=Development
env_file:
Expand All @@ -177,39 +177,27 @@ services:
- ./data/static:/data/static
depends_on:
- backend-dev
- redis

celery-ui:
scheduler-dev:
build:
context: src/backend
target: runtime-dev
args:
DOCKER_USER: ${DOCKER_USER:-1000}
user: ${DOCKER_USER:-1000}
depends_on:
- redis
command: ["python", "manage.py", "crontab"]
environment:
- FLOWER_UNAUTHENTICATED_API=true
- DJANGO_CONFIGURATION=Development
env_file:
- env.d/development/backend.defaults
- env.d/development/backend.local
volumes:
- ./src/backend:/app
ports:
- "8903:8803"
command: celery -A messages.celery_app flower --port=8803

# nginx:
# image: nginx:1.25
# ports:
# - "8083:8083"
# volumes:
# - ./docker/files/development/etc/nginx/conf.d:/etc/nginx/conf.d:ro
# depends_on:
# - keycloak
# - backend-dev
# - mta-in
# - mta-out
- ./data/static:/data/static
depends_on:
- backend-dev
- redis

frontend-dev:
user: "${DOCKER_USER:-1000}"
Expand Down
2 changes: 1 addition & 1 deletion docs/env.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ The application uses a new environment file structure with `.defaults` and `.loc
| Variable | Default | Description | Required |
|----------|---------|-------------|----------|
| `REDIS_URL` | `redis://redis:6379` | Redis connection URL (internal) | Optional |
| `CELERY_BROKER_URL` | `redis://redis:6379` | Celery message broker URL (internal) | Optional |
| `DRAMATIQ_BROKER_URL` | `redis://redis:6379` | Dramatiq message broker URL (internal) | Optional |
| `CACHES_DEFAULT_TIMEOUT` | `30` | Default cache timeout in seconds | Optional |

**Note**: For external Redis access, use `localhost:8913`. For internal container communication, use `redis:6379`.
Expand Down
4 changes: 4 additions & 0 deletions env.d/development/backend.defaults
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ METRICS_API_KEY=ExampleMetricsApiKey
# Python
PYTHONPATH=/app

# Dramatiq
DRAMATIQ_BROKER_URL=redis://redis:6379
DRAMATIQ_CRONTAB_REDIS_URL=redis://redis:6379/0

# Messages settings

# Mail
Expand Down
6 changes: 3 additions & 3 deletions src/backend/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# https://hub.docker.com/_/python
FROM python:3.13.7-slim-trixie AS base
FROM python:3.13.9-slim-trixie AS base

# Bump this to force an update of the apt repositories
ENV MIN_UPDATE_DATE="2025-09-02"
ENV MIN_UPDATE_DATE="2025-10-20"

RUN apt-get update && apt-get upgrade -y \
&& rm -rf /var/lib/apt/lists/*
Expand All @@ -22,7 +22,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
ENV POETRY_NO_INTERACTION=1
ENV POETRY_VIRTUALENVS_CREATE=0
ENV POETRY_VIRTUALENVS_OPTIONS_NO_PIP=1
ENV POETRY_VERSION=2.1.4
ENV POETRY_VERSION=2.2.1

RUN python -m pip install poetry==${POETRY_VERSION}

Expand Down
79 changes: 78 additions & 1 deletion src/backend/core/admin.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
"""Admin classes and registrations for core app."""

from django.contrib import admin, messages
from django.contrib.admin.views.decorators import staff_member_required
from django.contrib.auth import admin as auth_admin
from django.core.files.storage import storages
from django.http import HttpResponse
from django.shortcuts import redirect
from django.template.response import TemplateResponse
from django.urls import path
from django.urls import path, reverse
from django.utils.html import escape, format_html
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _

import dramatiq
import dramatiq_dashboard

from core.api.utils import get_file_key
from core.services.importer import ImportService

Expand Down Expand Up @@ -694,3 +699,75 @@ def get_html_body(self, obj):
return obj.html_body

get_html_body.short_description = "HTML Body"


# Dramatiq Dashboard Integration
@staff_member_required
def dramatiq_dashboard_view(request):
"""Serve the Dramatiq dashboard for staff users only."""
# Get the broker from django-dramatiq
broker = dramatiq.get_broker()

# Create the dashboard app
dashboard_app = dramatiq_dashboard.DashboardApp(broker=broker, prefix="")

# Create a WSGI environment
environ = {
"REQUEST_METHOD": request.method,
"PATH_INFO": request.path_info,
"QUERY_STRING": request.META.get("QUERY_STRING", ""),
"CONTENT_TYPE": request.META.get("CONTENT_TYPE", ""),
"CONTENT_LENGTH": request.META.get("CONTENT_LENGTH", ""),
"HTTP_HOST": request.META.get("HTTP_HOST", ""),
"SERVER_NAME": request.META.get("SERVER_NAME", ""),
"SERVER_PORT": request.META.get("SERVER_PORT", ""),
"wsgi.url_scheme": request.scheme,
"wsgi.input": request,
"wsgi.errors": request,
"wsgi.version": (1, 0),
"wsgi.multithread": True,
"wsgi.multiprocess": False,
"wsgi.run_once": False,
}

# Add HTTP headers
for key, value in request.META.items():
if key.startswith("HTTP_"):
environ[key] = value

# Call the dashboard app
def start_response(status, response_headers): # pylint: disable=unused-argument
# This will be called by the WSGI app
pass

# Get the response from the dashboard
response_body = dashboard_app(environ, start_response)

# Create Django response
response = HttpResponse(b"".join(response_body))
response.status_code = 200
response["Content-Type"] = "text/html; charset=utf-8"

return response

Comment on lines +704 to +752
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

WSGI bridge ignores status/headers and sets an invalid wsgi.input; prefix likely wrong.

Capture status/headers from start_response, build environ from request.META with a real byte stream, and set DashboardApp prefix to the mounted path for correct links.

 @staff_member_required
 def dramatiq_dashboard_view(request):
@@
-    dashboard_app = dramatiq_dashboard.DashboardApp(broker=broker, prefix="")
+    # Mount under the admin route so asset/links resolve
+    mount_path = reverse("admin:dramatiq_dashboard").rstrip("/")
+    dashboard_app = dramatiq_dashboard.DashboardApp(broker=broker, prefix=mount_path)
@@
-    environ = {
-        "REQUEST_METHOD": request.method,
-        "PATH_INFO": request.path_info,
-        "QUERY_STRING": request.META.get("QUERY_STRING", ""),
-        "CONTENT_TYPE": request.META.get("CONTENT_TYPE", ""),
-        "CONTENT_LENGTH": request.META.get("CONTENT_LENGTH", ""),
-        "HTTP_HOST": request.META.get("HTTP_HOST", ""),
-        "SERVER_NAME": request.META.get("SERVER_NAME", ""),
-        "SERVER_PORT": request.META.get("SERVER_PORT", ""),
-        "wsgi.url_scheme": request.scheme,
-        "wsgi.input": request,
-        "wsgi.errors": request,
-        "wsgi.version": (1, 0),
-        "wsgi.multithread": True,
-        "wsgi.multiprocess": False,
-        "wsgi.run_once": False,
-    }
+    # Start from Django's META and fix required keys
+    import io
+    environ = dict(request.META)
+    environ.setdefault("REQUEST_METHOD", request.method)
+    environ["PATH_INFO"] = request.path_info
+    environ.setdefault("QUERY_STRING", "")
+    environ["SCRIPT_NAME"] = mount_path
+    environ["wsgi.url_scheme"] = request.scheme
+    environ["wsgi.input"] = io.BytesIO(request.body or b"")
+    environ["wsgi.errors"] = io.BytesIO()
+    environ["wsgi.version"] = (1, 0)
+    environ["wsgi.multithread"] = True
+    environ["wsgi.multiprocess"] = False
+    environ["wsgi.run_once"] = False
@@
-    # Add HTTP headers
-    for key, value in request.META.items():
-        if key.startswith("HTTP_"):
-            environ[key] = value
-
-    # Call the dashboard app
-    def start_response(status, response_headers):  # pylint: disable=unused-argument
-        # This will be called by the WSGI app
-        pass
+    # Call the dashboard app
+    status_holder = {"status": "200 OK", "headers": []}
+    def start_response(status, response_headers, exc_info=None):
+        status_holder["status"] = status
+        status_holder["headers"] = response_headers
+        return lambda x: None
@@
-    response_body = dashboard_app(environ, start_response)
-
-    # Create Django response
-    response = HttpResponse(b"".join(response_body))
-    response.status_code = 200
-    response["Content-Type"] = "text/html; charset=utf-8"
+    response_iter = dashboard_app(environ, start_response)
+    body = b"".join(response_iter)
+    status_code = int(status_holder["status"].split()[0])
+    response = HttpResponse(body, status=status_code)
+    for (k, v) in status_holder["headers"]:
+        # Let Django manage Content-Length
+        if k.lower() != "content-length":
+            response[k] = v
@@
     return response


class CoreAdminSite(admin.AdminSite):
"""Custom admin site with Dramatiq dashboard integration."""

def get_urls(self):
"""Add Dramatiq dashboard URL to admin URLs."""
urls = super().get_urls()
custom_urls = [
path("dramatiq/", dramatiq_dashboard_view, name="dramatiq_dashboard"),
]
return custom_urls + urls

def index(self, request, extra_context=None):
"""Add Dramatiq dashboard link to admin index."""
extra_context = extra_context or {}
extra_context["dramatiq_dashboard_url"] = reverse("admin:dramatiq_dashboard")
return super().index(request, extra_context)


# Create custom admin site instance
admin_site = CoreAdminSite(name="admin")
4 changes: 2 additions & 2 deletions src/backend/core/api/viewsets/import_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ImportViewSet(viewsets.ViewSet):
request=ImportFileSerializer,
responses={
202: OpenApiResponse(
description="Import started. Returns Celery task ID for tracking.",
description="Import started. Returns task ID for tracking.",
response={
"type": "object",
"properties": {
Expand Down Expand Up @@ -96,7 +96,7 @@ def import_file(self, request):
request=ImportIMAPSerializer,
responses={
202: OpenApiResponse(
description="IMAP import started. Returns Celery task ID for tracking the import progress.",
description="IMAP import started. Returns task ID for tracking the import progress.",
response={
"type": "object",
"properties": {
Expand Down
4 changes: 2 additions & 2 deletions src/backend/core/api/viewsets/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def post(self, request):
)

# Launch async task for sending the message
task = send_message_task.delay(str(message.id), must_archive=must_archive)
task = send_message_task.send(str(message.id), must_archive=must_archive)

# --- Finalize ---
# Message state should be updated by prepare_outbound_message/send_message
Expand All @@ -130,4 +130,4 @@ def post(self, request):
# Update thread stats after un-drafting
message.thread.update_stats()

return Response({"task_id": task.id}, status=status.HTTP_200_OK)
return Response({"task_id": task.message_id}, status=status.HTTP_200_OK)
96 changes: 62 additions & 34 deletions src/backend/core/api/viewsets/task.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""API ViewSet for Celery task status."""
"""API ViewSet for asynchronous task statuses."""

import logging

from celery import states as celery_states
from celery.result import AsyncResult
import dramatiq
from dramatiq.results import ResultFailure, ResultMissing, ResultTimeout
from drf_spectacular.utils import (
OpenApiExample,
extend_schema,
Expand All @@ -14,10 +14,13 @@
from rest_framework.response import Response
from rest_framework.views import APIView

from messages.celery_app import app as celery_app
from core.utils import get_task_progress

logger = logging.getLogger(__name__)

# Map Dramatiq states to Celery-like states for frontend compatibility
DRAMATIQ_STATES = ["PENDING", "SUCCESS", "FAILURE", "RETRY", "REJECTED", "PROGRESS"]


@extend_schema(
tags=["tasks"],
Expand All @@ -34,9 +37,7 @@
200: inline_serializer(
name="TaskStatusResponse",
fields={
"status": drf_serializers.ChoiceField(
choices=sorted({*celery_states.ALL_STATES, "PROGRESS"})
),
"status": drf_serializers.ChoiceField(choices=sorted(DRAMATIQ_STATES)),
"result": drf_serializers.JSONField(allow_null=True),
"error": drf_serializers.CharField(allow_null=True),
},
Expand All @@ -59,38 +60,65 @@
],
)
class TaskDetailView(APIView):
"""View to retrieve the status of a Celery task."""
"""View to retrieve the status of a task."""

permission_classes = [permissions.IsAuthenticated]

def get(self, request, task_id):
"""Get the status of a Celery task."""

task_result = AsyncResult(task_id, app=celery_app)
"""Get the status of a task."""
# Try to fetch a result from Dramatiq's result backend
broker = dramatiq.get_broker()
result_backend = broker.get_results_backend()

# By default unknown tasks will be in PENDING. There is no reliable
# way to check if a task exists or not with Celery.
# https://github.com/celery/celery/issues/3596#issuecomment-262102185
if result_backend is not None:
try:
# Retrieve a Message for this task id from the backend, then get result
# See Dramatiq results API: message.get_result(...)
message = result_backend.get_message(task_id)
result = message.get_result(backend=result_backend, block=False)
return Response(
Comment on lines +70 to +79
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify results-backend access pattern.

broker.get_results_backend() and backend.get_message(...) must match your configured middleware/version. If not available, fetch the Results middleware and read its backend instead.


🌐 Web query:

In Dramatiq 1.18, what is the supported way to retrieve the results backend at runtime from a Broker, and does RedisBackend expose get_message(message_id)?

💡 Result:

  • Supported way: call Broker.get_results_backend() to fetch the Results middleware backend at runtime (raises RuntimeError if none). [1]

  • RedisBackend: it does not expose a get_message(message_id) API. The RedisBackend implements get_result(message) (and lower-level _get/_store) and uses a message object to build the storage key, so you must provide a Message (or use message.get_result / the backend.get_result APIs) rather than a standalone get_message(message_id). [1][2]

Sources: [1] Dramatiq Broker API (get_results_backend). [2] Dramatiq RedisBackend / results backend (get_result, build_message_key, _get/_store).


Fix incorrect RedisBackend API usage—get_message(task_id) doesn't exist.

RedisBackend does not expose a get_message(message_id) API. The code at lines 70-79 calls result_backend.get_message(task_id), which will fail at runtime. You must provide a Message object to use backend.get_result(message) or message.get_result() instead of trying to fetch messages by task_id alone.

Retrieve the Message from Dramatiq's actor system or message store, then pass it to the backend's get_result() method.

🤖 Prompt for AI Agents
In src/backend/core/api/viewsets/task.py around lines 70 to 79, the code calls
result_backend.get_message(task_id) which is not part of RedisBackend's API and
will fail; replace that call by using the backend's result retrieval API
directly (e.g. call result_backend.get_result(task_id) if the backend supports
fetching by id) or, if the backend requires a Message object, obtain/create the
dramatiq Message instance representing this task (from the actor/message store
or the original enqueue flow) and pass that Message into
result_backend.get_result(message) (or call
message.get_result(backend=result_backend)). Update imports and error handling
accordingly.

{
"status": "SUCCESS",
"result": result,
"error": None,
}
)
except ResultMissing:
# No result yet; fall through to progress/pending logic
pass
except ResultFailure as exc:
return Response(
{
"status": "FAILURE",
"result": None,
"error": str(exc),
}
)
except ResultTimeout as exc:
# Treat timeouts as pending
logger.debug("Result timeout for task %s: %s", task_id, exc)

# Prepare the response data
result_data = {
"status": task_result.status,
"result": None,
"error": None,
}
# Check if we have progress data for this task
progress_data = get_task_progress(task_id)

# If the result is a dict with status/result/error, unpack and propagate status
if isinstance(task_result.result, dict) and set(task_result.result.keys()) >= {
"status",
"result",
"error",
}:
result_data["status"] = task_result.result["status"]
result_data["result"] = task_result.result["result"]
result_data["error"] = task_result.result["error"]
else:
result_data["result"] = task_result.result
if task_result.state == "PROGRESS" and task_result.info:
result_data.update(task_result.info)
if progress_data:
# Task is in progress
return Response(
{
"status": "PROGRESS",
"result": None,
"error": None,
"progress": progress_data.get("progress"),
"message": progress_data.get("metadata", {}).get("message"),
"timestamp": progress_data.get("timestamp"),
}
)

return Response(result_data)
# Default to pending when no result and no progress
return Response(
{
"status": "PENDING",
"result": None,
"error": None,
}
)
Loading
Loading