diff --git a/Makefile b/Makefile
index aeb972498..8753203ee 100644
--- a/Makefile
+++ b/Makefile
@@ -324,8 +324,9 @@ back-poetry-tree: ## show dependencies as a tree
@$(COMPOSE) run --rm --build backend-dev pipdeptree
.PHONY: back-poetry-tree
-pip-audit: ## check the dependencies
- @$(COMPOSE) run --rm --no-deps -e HOME=/tmp --build backend-dev pip-audit
+pip-audit: ## check the dependencies
+ # https://github.com/pypa/pip/issues/13607
+ @$(COMPOSE) run --rm --no-deps -e HOME=/tmp --build backend-dev pip-audit --ignore-vuln GHSA-4xh5-x5gv-qwph
.PHONY: pip-audit
collectstatic: ## collect static files
diff --git a/Procfile b/Procfile
index 13c5fdbb6..b40827f0f 100644
--- a/Procfile
+++ b/Procfile
@@ -1,3 +1,4 @@
web: bin/scalingo_run_web
-worker: celery -A messages.celery_app worker --task-events --beat -l INFO -c $CELERY_CONCURRENCY -Q celery,default
+worker: python manage.py worker
+scheduler: python manage.py crontab
postdeploy: python manage.py migrate
\ No newline at end of file
diff --git a/compose.yaml b/compose.yaml
index df59e2bd4..1103f16f6 100644
--- a/compose.yaml
+++ b/compose.yaml
@@ -159,14 +159,14 @@ services:
target: poetry
pull_policy: build
- celery-dev:
+ worker-dev:
build:
context: src/backend
target: runtime-dev
args:
DOCKER_USER: ${DOCKER_USER:-1000}
user: ${DOCKER_USER:-1000}
- command: ["celery", "-A", "messages.celery_app", "worker", "-l", "DEBUG", "-Q", "celery,default"]
+ command: ["python", "manage.py", "worker", "-p", "2", "-t", "2"]
environment:
- DJANGO_CONFIGURATION=Development
env_file:
@@ -177,39 +177,27 @@ services:
- ./data/static:/data/static
depends_on:
- backend-dev
+ - redis
- celery-ui:
+ scheduler-dev:
build:
context: src/backend
target: runtime-dev
args:
DOCKER_USER: ${DOCKER_USER:-1000}
user: ${DOCKER_USER:-1000}
- depends_on:
- - redis
+ command: ["python", "manage.py", "crontab"]
environment:
- - FLOWER_UNAUTHENTICATED_API=true
- DJANGO_CONFIGURATION=Development
env_file:
- env.d/development/backend.defaults
- env.d/development/backend.local
volumes:
- ./src/backend:/app
- ports:
- - "8903:8803"
- command: celery -A messages.celery_app flower --port=8803
-
- # nginx:
- # image: nginx:1.25
- # ports:
- # - "8083:8083"
- # volumes:
- # - ./docker/files/development/etc/nginx/conf.d:/etc/nginx/conf.d:ro
- # depends_on:
- # - keycloak
- # - backend-dev
- # - mta-in
- # - mta-out
+ - ./data/static:/data/static
+ depends_on:
+ - backend-dev
+ - redis
frontend-dev:
user: "${DOCKER_USER:-1000}"
diff --git a/docs/env.md b/docs/env.md
index 9a7d9c7fb..c4332a4e5 100644
--- a/docs/env.md
+++ b/docs/env.md
@@ -60,7 +60,7 @@ The application uses a new environment file structure with `.defaults` and `.loc
| Variable | Default | Description | Required |
|----------|---------|-------------|----------|
| `REDIS_URL` | `redis://redis:6379` | Redis connection URL (internal) | Optional |
-| `CELERY_BROKER_URL` | `redis://redis:6379` | Celery message broker URL (internal) | Optional |
+| `DRAMATIQ_BROKER_URL` | `redis://redis:6379` | Dramatiq message broker URL (internal) | Optional |
| `CACHES_DEFAULT_TIMEOUT` | `30` | Default cache timeout in seconds | Optional |
**Note**: For external Redis access, use `localhost:8913`. For internal container communication, use `redis:6379`.
diff --git a/env.d/development/backend.defaults b/env.d/development/backend.defaults
index 7d52973c2..404f63895 100644
--- a/env.d/development/backend.defaults
+++ b/env.d/development/backend.defaults
@@ -27,6 +27,10 @@ METRICS_API_KEY=ExampleMetricsApiKey
# Python
PYTHONPATH=/app
+# Dramatiq
+DRAMATIQ_BROKER_URL=redis://redis:6379
+DRAMATIQ_CRONTAB_REDIS_URL=redis://redis:6379/0
+
# Messages settings
# Mail
diff --git a/src/backend/Dockerfile b/src/backend/Dockerfile
index b4e66dbc4..ac62a4a6c 100644
--- a/src/backend/Dockerfile
+++ b/src/backend/Dockerfile
@@ -1,8 +1,8 @@
# https://hub.docker.com/_/python
-FROM python:3.13.7-slim-trixie AS base
+FROM python:3.13.9-slim-trixie AS base
# Bump this to force an update of the apt repositories
-ENV MIN_UPDATE_DATE="2025-09-02"
+ENV MIN_UPDATE_DATE="2025-10-20"
RUN apt-get update && apt-get upgrade -y \
&& rm -rf /var/lib/apt/lists/*
@@ -22,7 +22,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
ENV POETRY_NO_INTERACTION=1
ENV POETRY_VIRTUALENVS_CREATE=0
ENV POETRY_VIRTUALENVS_OPTIONS_NO_PIP=1
-ENV POETRY_VERSION=2.1.4
+ENV POETRY_VERSION=2.2.1
RUN python -m pip install poetry==${POETRY_VERSION}
diff --git a/src/backend/core/admin.py b/src/backend/core/admin.py
index d8bb18ab7..694190764 100644
--- a/src/backend/core/admin.py
+++ b/src/backend/core/admin.py
@@ -1,15 +1,20 @@
"""Admin classes and registrations for core app."""
from django.contrib import admin, messages
+from django.contrib.admin.views.decorators import staff_member_required
from django.contrib.auth import admin as auth_admin
from django.core.files.storage import storages
+from django.http import HttpResponse
from django.shortcuts import redirect
from django.template.response import TemplateResponse
-from django.urls import path
+from django.urls import path, reverse
from django.utils.html import escape, format_html
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
+import dramatiq
+import dramatiq_dashboard
+
from core.api.utils import get_file_key
from core.services.importer import ImportService
@@ -694,3 +699,75 @@ def get_html_body(self, obj):
return obj.html_body
get_html_body.short_description = "HTML Body"
+
+
+# Dramatiq Dashboard Integration
+@staff_member_required
+def dramatiq_dashboard_view(request):
+ """Serve the Dramatiq dashboard for staff users only."""
+ # Get the broker from django-dramatiq
+ broker = dramatiq.get_broker()
+
+ # Create the dashboard app
+ dashboard_app = dramatiq_dashboard.DashboardApp(broker=broker, prefix="")
+
+ # Create a WSGI environment
+ environ = {
+ "REQUEST_METHOD": request.method,
+ "PATH_INFO": request.path_info,
+ "QUERY_STRING": request.META.get("QUERY_STRING", ""),
+ "CONTENT_TYPE": request.META.get("CONTENT_TYPE", ""),
+ "CONTENT_LENGTH": request.META.get("CONTENT_LENGTH", ""),
+ "HTTP_HOST": request.META.get("HTTP_HOST", ""),
+ "SERVER_NAME": request.META.get("SERVER_NAME", ""),
+ "SERVER_PORT": request.META.get("SERVER_PORT", ""),
+ "wsgi.url_scheme": request.scheme,
+ "wsgi.input": request,
+ "wsgi.errors": request,
+ "wsgi.version": (1, 0),
+ "wsgi.multithread": True,
+ "wsgi.multiprocess": False,
+ "wsgi.run_once": False,
+ }
+
+ # Add HTTP headers
+ for key, value in request.META.items():
+ if key.startswith("HTTP_"):
+ environ[key] = value
+
+ # Call the dashboard app
+ def start_response(status, response_headers): # pylint: disable=unused-argument
+ # This will be called by the WSGI app
+ pass
+
+ # Get the response from the dashboard
+ response_body = dashboard_app(environ, start_response)
+
+ # Create Django response
+ response = HttpResponse(b"".join(response_body))
+ response.status_code = 200
+ response["Content-Type"] = "text/html; charset=utf-8"
+
+ return response
+
+
+class CoreAdminSite(admin.AdminSite):
+ """Custom admin site with Dramatiq dashboard integration."""
+
+ def get_urls(self):
+ """Add Dramatiq dashboard URL to admin URLs."""
+ urls = super().get_urls()
+ custom_urls = [
+ path("dramatiq/", dramatiq_dashboard_view, name="dramatiq_dashboard"),
+ ]
+ return custom_urls + urls
+
+ def index(self, request, extra_context=None):
+ """Add Dramatiq dashboard link to admin index."""
+ extra_context = extra_context or {}
+ extra_context["dramatiq_dashboard_url"] = reverse("admin:dramatiq_dashboard")
+ return super().index(request, extra_context)
+
+
+# Create custom admin site instance
+admin_site = CoreAdminSite(name="admin")
diff --git a/src/backend/core/api/viewsets/import_message.py b/src/backend/core/api/viewsets/import_message.py
index 7f127e099..67cd9a0ac 100644
--- a/src/backend/core/api/viewsets/import_message.py
+++ b/src/backend/core/api/viewsets/import_message.py
@@ -43,7 +43,7 @@ class ImportViewSet(viewsets.ViewSet):
request=ImportFileSerializer,
responses={
202: OpenApiResponse(
- description="Import started. Returns Celery task ID for tracking.",
+ description="Import started. Returns task ID for tracking.",
response={
"type": "object",
"properties": {
@@ -96,7 +96,7 @@ def import_file(self, request):
request=ImportIMAPSerializer,
responses={
202: OpenApiResponse(
- description="IMAP import started. Returns Celery task ID for tracking the import progress.",
+ description="IMAP import started. Returns task ID for tracking the import progress.",
response={
"type": "object",
"properties": {
diff --git a/src/backend/core/api/viewsets/send.py b/src/backend/core/api/viewsets/send.py
index 55e98e645..d9b0ed4d9 100644
--- a/src/backend/core/api/viewsets/send.py
+++ b/src/backend/core/api/viewsets/send.py
@@ -120,7 +120,7 @@ def post(self, request):
)
# Launch async task for sending the message
- task = send_message_task.delay(str(message.id), must_archive=must_archive)
+ task = send_message_task.send(str(message.id), must_archive=must_archive)
# --- Finalize ---
# Message state should be updated by prepare_outbound_message/send_message
@@ -130,4 +130,4 @@ def post(self, request):
# Update thread stats after un-drafting
message.thread.update_stats()
- return Response({"task_id": task.id}, status=status.HTTP_200_OK)
+ return Response({"task_id": task.message_id}, status=status.HTTP_200_OK)
diff --git a/src/backend/core/api/viewsets/task.py b/src/backend/core/api/viewsets/task.py
index e55e4fca1..3af4c6f8f 100644
--- a/src/backend/core/api/viewsets/task.py
+++ b/src/backend/core/api/viewsets/task.py
@@ -1,9 +1,9 @@
-"""API ViewSet for Celery task status."""
+"""API ViewSet for asynchronous task statuses."""
import logging
-from celery import states as celery_states
-from celery.result import AsyncResult
+import dramatiq
+from dramatiq.results import ResultFailure, ResultMissing, ResultTimeout
from drf_spectacular.utils import (
OpenApiExample,
extend_schema,
@@ -14,10 +14,13 @@
from rest_framework.response import Response
from rest_framework.views import APIView
-from messages.celery_app import app as celery_app
+from core.utils import get_task_progress
logger = logging.getLogger(__name__)
+# Map Dramatiq states to Celery-like states for frontend compatibility
+DRAMATIQ_STATES = ["PENDING", "SUCCESS", "FAILURE", "RETRY", "REJECTED", "PROGRESS"]
+
@extend_schema(
tags=["tasks"],
@@ -34,9 +37,7 @@
200: inline_serializer(
name="TaskStatusResponse",
fields={
- "status": drf_serializers.ChoiceField(
- choices=sorted({*celery_states.ALL_STATES, "PROGRESS"})
- ),
+ "status": drf_serializers.ChoiceField(choices=sorted(DRAMATIQ_STATES)),
"result": drf_serializers.JSONField(allow_null=True),
"error": drf_serializers.CharField(allow_null=True),
},
@@ -59,38 +60,65 @@
],
)
class TaskDetailView(APIView):
- """View to retrieve the status of a Celery task."""
+ """View to retrieve the status of a task."""
permission_classes = [permissions.IsAuthenticated]
def get(self, request, task_id):
- """Get the status of a Celery task."""
-
- task_result = AsyncResult(task_id, app=celery_app)
+ """Get the status of a task."""
+ # Try to fetch a result from Dramatiq's result backend
+ broker = dramatiq.get_broker()
+ result_backend = broker.get_results_backend()
- # By default unknown tasks will be in PENDING. There is no reliable
- # way to check if a task exists or not with Celery.
- # https://github.com/celery/celery/issues/3596#issuecomment-262102185
+ if result_backend is not None:
+ try:
+ # Retrieve a Message for this task id from the backend, then get result
+ # See Dramatiq results API: message.get_result(...)
+ message = result_backend.get_message(task_id)
+ result = message.get_result(backend=result_backend, block=False)
+ return Response(
+ {
+ "status": "SUCCESS",
+ "result": result,
+ "error": None,
+ }
+ )
+ except ResultMissing:
+ # No result yet; fall through to progress/pending logic
+ pass
+ except ResultFailure as exc:
+ return Response(
+ {
+ "status": "FAILURE",
+ "result": None,
+ "error": str(exc),
+ }
+ )
+ except ResultTimeout as exc:
+ # Treat timeouts as pending
+ logger.debug("Result timeout for task %s: %s", task_id, exc)
- # Prepare the response data
- result_data = {
- "status": task_result.status,
- "result": None,
- "error": None,
- }
+ # Check if we have progress data for this task
+ progress_data = get_task_progress(task_id)
- # If the result is a dict with status/result/error, unpack and propagate status
- if isinstance(task_result.result, dict) and set(task_result.result.keys()) >= {
- "status",
- "result",
- "error",
- }:
- result_data["status"] = task_result.result["status"]
- result_data["result"] = task_result.result["result"]
- result_data["error"] = task_result.result["error"]
- else:
- result_data["result"] = task_result.result
- if task_result.state == "PROGRESS" and task_result.info:
- result_data.update(task_result.info)
+ if progress_data:
+ # Task is in progress
+ return Response(
+ {
+ "status": "PROGRESS",
+ "result": None,
+ "error": None,
+ "progress": progress_data.get("progress"),
+ "message": progress_data.get("metadata", {}).get("message"),
+ "timestamp": progress_data.get("timestamp"),
+ }
+ )
- return Response(result_data)
+ # Default to pending when no result and no progress
+ return Response(
+ {
+ "status": "PENDING",
+ "result": None,
+ "error": None,
+ }
+ )
diff --git a/src/backend/core/management/commands/run_task.py b/src/backend/core/management/commands/run_task.py
index dc9f2f458..5be7d634c 100644
--- a/src/backend/core/management/commands/run_task.py
+++ b/src/backend/core/management/commands/run_task.py
@@ -1,8 +1,8 @@
"""
-Management command to run arbitrary Celery tasks synchronously.
+Management command to run arbitrary Dramatiq actors synchronously.
-This command provides a Django interface to run Celery tasks with the same
-CLI flags as the main Celery CLI, but executes them synchronously instead
+This command provides a Django interface to run Dramatiq actors with the same
+CLI flags as the main Dramatiq CLI, but executes them synchronously instead
of queuing them as background tasks.
"""
@@ -16,21 +16,21 @@
class Command(BaseCommand):
- """Run arbitrary Celery tasks synchronously."""
+ """Run arbitrary Dramatiq actors synchronously."""
help = """
- Run arbitrary Celery tasks synchronously.
+ Run arbitrary Dramatiq actors synchronously.
Examples:
- python manage.py run_task fetch_service_metrics
- python manage.py run_task fetch_metrics_for_service --pargs '["123e4567-e89b-12d3-a456-426614174000"]'
- python manage.py run_task fetch_service_metrics --kwargs '{"debug": true}'
- python manage.py run_task other_app.tasks.some_task
+ python manage.py run_task send_message_task
+ python manage.py run_task send_message_task --pargs '["123e4567-e89b-12d3-a456-426614174000"]'
+ python manage.py run_task send_message_task --kwargs '{"must_archive": true}'
+ python manage.py run_task core.mda.tasks.send_message_task
"""
def add_arguments(self, parser):
"""Add command line arguments."""
- parser.add_argument("task_name", help="Name of the Celery task to run")
+ parser.add_argument("task_name", help="Name of the Dramatiq actor to run")
# Task execution options
parser.add_argument(
@@ -76,7 +76,7 @@ def handle(self, *args, **options):
try:
# Execute task synchronously
- result = task_func.apply(args=task_args, kwargs=kwargs)
+ result = task_func(*task_args, **kwargs)
# Output result
if options["json"]:
@@ -92,8 +92,18 @@ def handle(self, *args, **options):
def _get_task_function(self, task_name: str):
"""Get the task function by name using dynamic imports."""
try:
- # Try to import from core.tasks first
- tasks_module = importlib.import_module("core.tasks")
+ # Try to import from core.mda.tasks first
+ tasks_module = importlib.import_module("core.mda.tasks")
+ if hasattr(tasks_module, task_name):
+ return getattr(tasks_module, task_name)
+
+ # Try core.services.search.tasks
+ tasks_module = importlib.import_module("core.services.search.tasks")
+ if hasattr(tasks_module, task_name):
+ return getattr(tasks_module, task_name)
+
+ # Try core.services.importer.tasks
+ tasks_module = importlib.import_module("core.services.importer.tasks")
if hasattr(tasks_module, task_name):
return getattr(tasks_module, task_name)
@@ -105,7 +115,7 @@ def _get_task_function(self, task_name: str):
return getattr(module, func_name)
self.stdout.write(
- self.style.WARNING(f"Task '{task_name}' not found in core.tasks module")
+ self.style.WARNING(f"Task '{task_name}' not found in any tasks module")
)
return None
diff --git a/src/backend/core/management/commands/search_reindex.py b/src/backend/core/management/commands/search_reindex.py
deleted file mode 100644
index a6a9d51e6..000000000
--- a/src/backend/core/management/commands/search_reindex.py
+++ /dev/null
@@ -1,157 +0,0 @@
-"""Management command to reindex content in OpenSearch."""
-
-import uuid
-
-from django.core.management.base import BaseCommand, CommandError
-
-from core import models
-from core.services.search import create_index_if_not_exists
-from core.services.search.tasks import (
- _reindex_all_base,
- reindex_all,
- reindex_mailbox_task,
- reindex_thread_task,
-)
-
-
-class Command(BaseCommand):
- """Reindex content in OpenSearch."""
-
- help = "Reindex content in OpenSearch"
-
- def add_arguments(self, parser):
- """Add command arguments."""
- # Define a mutually exclusive group for the reindex scope
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument(
- "--all",
- action="store_true",
- help="Reindex all threads and messages",
- )
- group.add_argument(
- "--thread",
- type=str,
- help="Reindex a specific thread by ID",
- )
- group.add_argument(
- "--mailbox",
- type=str,
- help="Reindex all threads and messages in a specific mailbox by ID",
- )
-
- # Async option
- parser.add_argument(
- "--async",
- action="store_true",
- help="Run task asynchronously",
- dest="async_mode",
- )
-
- # Whether to recreate the index
- parser.add_argument(
- "--recreate-index",
- action="store_true",
- help="Recreate the index before reindexing",
- )
-
- def handle(self, *args, **options):
- """Execute the command."""
- # Ensure index exists
- self.stdout.write("Ensuring OpenSearch index exists...")
- create_index_if_not_exists()
-
- # Handle reindexing based on scope
- if options["all"]:
- self._reindex_all(options["async_mode"])
- elif options["thread"]:
- self._reindex_thread(options["thread"], options["async_mode"])
- elif options["mailbox"]:
- self._reindex_mailbox(options["mailbox"], options["async_mode"])
-
- def _reindex_all(self, async_mode):
- """Reindex all threads and messages."""
- self.stdout.write("Reindexing all threads and messages...")
-
- if async_mode:
- task = reindex_all.delay()
- self.stdout.write(
- self.style.SUCCESS(f"Reindexing task scheduled (ID: {task.id})")
- )
- else:
- # For synchronous execution, use the base function directly
- def update_progress(current, total, success_count, failure_count):
- """Update progress in the console."""
- self.stdout.write(
- f"Progress: {current}/{total} threads processed "
- f"({success_count} succeeded, {failure_count} failed)"
- )
-
- result = _reindex_all_base(update_progress)
- self.stdout.write(
- self.style.SUCCESS(
- f"Reindexing completed: {result.get('success_count', 0)} succeeded, "
- f"{result.get('failure_count', 0)} failed"
- )
- )
- if result.get("failure_count", 0) > 0:
- return 1
-
- def _reindex_thread(self, thread_id, async_mode):
- """Reindex a specific thread and its messages."""
- try:
- thread_uuid = uuid.UUID(thread_id)
- # Verify thread exists
- models.Thread.objects.get(id=thread_uuid)
- except ValueError as e:
- raise CommandError(f"Invalid thread ID: {thread_id}") from e
- except models.Thread.DoesNotExist as e:
- raise CommandError(f"Thread with ID {thread_id} does not exist") from e
-
- self.stdout.write(f"Reindexing thread {thread_id}...")
-
- if async_mode:
- task = reindex_thread_task.delay(thread_id)
- self.stdout.write(
- self.style.SUCCESS(f"Reindexing task scheduled (ID: {task.id})")
- )
- else:
- result = reindex_thread_task(thread_id)
- if result.get("success", False):
- self.stdout.write(
- self.style.SUCCESS(f"Thread {thread_id} indexed successfully")
- )
- else:
- self.stdout.write(
- self.style.ERROR(
- f"Failed to index thread {thread_id}: {result.get('error', '')}"
- )
- )
- return 1
-
- def _reindex_mailbox(self, mailbox_id, async_mode):
- """Reindex all threads and messages in a specific mailbox."""
- try:
- mailbox_uuid = uuid.UUID(mailbox_id)
- mailbox = models.Mailbox.objects.get(id=mailbox_uuid)
- except ValueError as e:
- raise CommandError(f"Invalid mailbox ID: {mailbox_id}") from e
- except models.Mailbox.DoesNotExist as e:
- raise CommandError(f"Mailbox with ID {mailbox_id} does not exist") from e
-
- self.stdout.write(f"Reindexing threads for mailbox {mailbox}...")
-
- if async_mode:
- task = reindex_mailbox_task.delay(mailbox_id)
- self.stdout.write(
- self.style.SUCCESS(f"Reindexing task scheduled (ID: {task.id})")
- )
- else:
- result = reindex_mailbox_task(mailbox_id)
- self.stdout.write(
- self.style.SUCCESS(
- f"Reindexing completed: {result.get('success_count', 0)} succeeded, "
- f"{result.get('failure_count', 0)} failed"
- )
- )
- if result.get("failure_count", 0) > 0:
- return 1
diff --git a/src/backend/core/management/commands/worker.py b/src/backend/core/management/commands/worker.py
new file mode 100644
index 000000000..db6ac8d5c
--- /dev/null
+++ b/src/backend/core/management/commands/worker.py
@@ -0,0 +1,10 @@
+from django_dramatiq.management.commands.rundramatiq import (
+ Command as RunDramatiqCommand,
+)
+
+
+class Command(RunDramatiqCommand):
+ def discover_tasks_modules(self):
+ tasks_modules = super().discover_tasks_modules()
+ tasks_modules[0] = "core.worker_setup"
+ return tasks_modules
diff --git a/src/backend/core/mda/outbound.py b/src/backend/core/mda/outbound.py
index 55d1a63dc..4d29f5b56 100644
--- a/src/backend/core/mda/outbound.py
+++ b/src/backend/core/mda/outbound.py
@@ -229,7 +229,7 @@ def prepare_outbound_message(
def send_message(message: models.Message, force_mta_out: bool = False):
"""Send an existing Message, internally or externally.
- This part is called asynchronously from the celery worker.
+ This part is called asynchronously from the worker.
"""
# Refuse to send messages that are draft or not senders
diff --git a/src/backend/core/mda/tasks.py b/src/backend/core/mda/tasks.py
index 4e948aec7..5d386f762 100644
--- a/src/backend/core/mda/tasks.py
+++ b/src/backend/core/mda/tasks.py
@@ -5,81 +5,74 @@
from django.db.models import Q
from django.utils import timezone
-from celery.utils.log import get_task_logger
+import dramatiq
+from dramatiq_crontab import cron
from core import models
from core.enums import MessageDeliveryStatusChoices
from core.mda.outbound import send_message
from core.mda.selfcheck import run_selfcheck
+from core.utils import register_task, set_task_progress
-from messages.celery_app import app as celery_app
+logger = dramatiq.get_logger(__name__)
-logger = get_task_logger(__name__)
-
-@celery_app.task(bind=True)
-def send_message_task(self, message_id, force_mta_out=False, must_archive=False):
+@register_task
+def send_message_task(message_id, force_mta_out=False, must_archive=False):
"""Send a message asynchronously.
Args:
message_id: The ID of the message to send
force_mta_out: Whether to force sending via MTA
+ must_archive: Whether to archive the thread after sending
Returns:
dict: A dictionary with success status and info
"""
- try:
- message = (
- models.Message.objects.select_related("thread", "sender")
- .prefetch_related("recipients__contact")
- .get(id=message_id)
- )
+ set_task_progress(0, {"message": "Starting message send"})
- send_message(message, force_mta_out)
+ message = (
+ models.Message.objects.select_related("thread", "sender")
+ .prefetch_related("recipients__contact")
+ .get(id=message_id)
+ )
- # Update task state with progress information
- self.update_state(
- state="SUCCESS",
- meta={
- "status": "completed", # TODO fetch recipients statuses
- "message_id": str(message_id),
- "success": True,
- },
- )
+ set_task_progress(25, {"message": "Message loaded, sending..."})
- # If requested, archive the whole thread after sending
- if must_archive:
- try:
- thread = message.thread
- models.Message.objects.filter(thread=thread).update(
- is_archived=True, archived_at=timezone.now()
- )
- thread.update_stats()
- except Exception as e:
- # Not critical, just log the error
- logger.exception(
- "Error in send_message_task when archiving thread %s after sending message %s: %s",
- thread.id,
- message_id,
- e,
- )
-
- return {
- "message_id": str(message_id),
- "success": True,
- }
- # pylint: disable=broad-exception-caught
- except Exception as e:
- logger.exception("Error in send_message_task for message %s: %s", message_id, e)
- self.update_state(
- state="FAILURE",
- meta={"status": "failed", "message_id": str(message_id), "error": str(e)},
- )
- raise
+ send_message(message, force_mta_out)
+ set_task_progress(75, {"message": "Message sent, processing archive..."})
-@celery_app.task(bind=True)
-def selfcheck_task(self):
+ # If requested, archive the whole thread after sending
+ if must_archive:
+ try:
+ thread = message.thread
+ models.Message.objects.filter(thread=thread).update(
+ is_archived=True, archived_at=timezone.now()
+ )
+ thread.update_stats()
+ set_task_progress(90, {"message": "Thread archived"})
+ except Exception as e:
+ # Not critical, just log the error
+ logger.exception(
+ "Error in send_message_task when archiving thread %s after sending message %s: %s",
+ thread.id,
+ message_id,
+ e,
+ )
+
+ result = {
+ "message_id": str(message_id),
+ "success": True,
+ }
+
+ set_task_progress(100, {"message": "Message sent successfully"})
+ return result
+
+
+@cron("0 */6 * * *") # Every 6 hours
+@register_task
+def selfcheck_task():
"""Run a selfcheck of the mail delivery system.
This task performs an end-to-end test of the mail delivery pipeline:
@@ -94,33 +87,13 @@ def selfcheck_task(self):
Returns:
dict: A dictionary with success status, timings, and metrics
"""
- try:
- result = run_selfcheck()
-
- # Update task state with progress information
- self.update_state(
- state="SUCCESS",
- meta={
- "status": "completed",
- "success": result["success"],
- "send_time": result["send_time"],
- "reception_time": result["reception_time"],
- },
- )
+ result = run_selfcheck()
+ return result
- return result
- # pylint: disable=broad-exception-caught
- except Exception as e:
- logger.exception("Error in selfcheck_task: %s", e)
- self.update_state(
- state="FAILURE",
- meta={"status": "failed", "error": str(e)},
- )
- raise
-
-@celery_app.task(bind=True)
-def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=100):
+@cron("*/5 * * * *") # Every 5 minutes
+@register_task
+def retry_messages_task(message_id=None, force_mta_out=False, batch_size=100):
"""Retry sending messages with retryable recipients (respects retry timing).
Args:
@@ -131,6 +104,9 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1
Returns:
dict: A dictionary with task status and results
"""
+
+ set_task_progress(0, {"message": "Finding messages to retry"})
+
# Get messages to process
if message_id:
# Single message mode
@@ -169,7 +145,7 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1
total_messages = len(messages_to_process)
if total_messages == 0:
- return {
+ result = {
"success": True,
"total_messages": 0,
"processed_messages": 0,
@@ -177,6 +153,16 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1
"error_count": 0,
"message": "No messages ready for retry",
}
+ set_task_progress(100, {"message": "No messages to retry"})
+ return result
+
+ set_task_progress(
+ 10,
+ {
+ "message": f"Found {total_messages} messages to retry",
+ "total_messages": total_messages,
+ },
+ )
# Process messages in batches
processed_count = 0
@@ -186,19 +172,20 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1
for batch_start in range(0, total_messages, batch_size):
batch_messages = messages_to_process[batch_start : batch_start + batch_size]
- # Update progress for bulk operations
- if not message_id:
- self.update_state(
- state="PROGRESS",
- meta={
- "current_batch": batch_start // batch_size + 1,
- "total_batches": (total_messages + batch_size - 1) // batch_size,
- "processed_messages": processed_count,
- "total_messages": total_messages,
- "success_count": success_count,
- "error_count": error_count,
- },
- )
+ # Update progress for batch processing
+ progress_percentage = min(10 + (batch_start / total_messages) * 80, 90)
+ set_task_progress(
+ int(progress_percentage),
+ {
+ "message": f"Processing batch {batch_start // batch_size + 1}",
+ "current_batch": batch_start // batch_size + 1,
+ "total_batches": (total_messages + batch_size - 1) // batch_size,
+ "processed_messages": processed_count,
+ "total_messages": total_messages,
+ "success_count": success_count,
+ "error_count": error_count,
+ },
+ )
for message in batch_messages:
try:
@@ -227,7 +214,7 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1
# Return appropriate result format
if message_id:
- return {
+ result = {
"success": True,
"message_id": str(message_id),
"recipients_processed": success_count,
@@ -235,11 +222,19 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1
"success_count": success_count,
"error_count": error_count,
}
+ else:
+ result = {
+ "success": True,
+ "total_messages": total_messages,
+ "processed_messages": processed_count,
+ "success_count": success_count,
+ "error_count": error_count,
+ }
- return {
- "success": True,
- "total_messages": total_messages,
- "processed_messages": processed_count,
- "success_count": success_count,
- "error_count": error_count,
- }
+ set_task_progress(
+ 100,
+ {
+ "message": f"Retry completed: {success_count} succeeded, {error_count} failed"
+ },
+ )
+ return result
diff --git a/src/backend/core/services/dns/tasks.py b/src/backend/core/services/dns/tasks.py
deleted file mode 100644
index a933cc4da..000000000
--- a/src/backend/core/services/dns/tasks.py
+++ /dev/null
@@ -1,17 +0,0 @@
-"""DNS tasks."""
-
-# pylint: disable=unused-argument, broad-exception-raised, broad-exception-caught, too-many-lines
-
-# @celery_app.task(bind=True)
-# def check_maildomain_dns(self, maildomain_id):
-# """Check if the DNS records for a mail domain are correct."""
-
-# maildomain = models.MailDomain.objects.get(id=maildomain_id)
-# expected_records = maildomain.get_expected_dns_records()
-# for record in expected_records:
-# res = dns.resolver.resolve(
-# record["target"], record["type"], raise_on_no_answer=False, lifetime=10
-# )
-# print(res)
-# print(record)
-# return {"success": True}
diff --git a/src/backend/core/services/importer/imap.py b/src/backend/core/services/importer/imap.py
index 7b33e3aae..2be179ee7 100644
--- a/src/backend/core/services/importer/imap.py
+++ b/src/backend/core/services/importer/imap.py
@@ -3,6 +3,7 @@
import base64
import codecs
import imaplib
+import logging
import re
import socket
import time
@@ -10,12 +11,11 @@
from django.conf import settings
-from celery.utils.log import get_task_logger
-
from core.mda.inbound import deliver_inbound_message
from core.mda.rfc5322 import parse_email_message
+from core.utils import set_task_progress
-logger = get_task_logger(__name__)
+logger = logging.getLogger(__name__)
def decode_imap_utf7(s):
@@ -396,7 +396,6 @@ def process_folder_messages( # pylint: disable=too-many-arguments
message_list: List[bytes],
recipient: Any,
username: str,
- task_instance: Any,
success_count: int,
failure_count: int,
current_message: int,
@@ -443,17 +442,17 @@ def process_folder_messages( # pylint: disable=too-many-arguments
# Update task state after processing the message
message_status = f"Processing message {current_message} of {total_messages}"
- result = {
- "message_status": message_status,
- "total_messages": total_messages,
- "success_count": success_count,
- "failure_count": failure_count,
- "type": "imap",
- "current_message": current_message,
- }
- task_instance.update_state(
- state="PROGRESS",
- meta={"result": result, "error": None},
+
+ set_task_progress(
+ None,
+ {
+ "message_status": message_status,
+ "total_messages": total_messages,
+ "success_count": success_count,
+ "failure_count": failure_count,
+ "type": "imap",
+ "current_message": current_message,
+ },
)
return success_count, failure_count, current_message
diff --git a/src/backend/core/services/importer/service.py b/src/backend/core/services/importer/service.py
index 980a934c8..e89491125 100644
--- a/src/backend/core/services/importer/service.py
+++ b/src/backend/core/services/importer/service.py
@@ -69,25 +69,25 @@ def import_file(
# Check MIME type for MBOX
if unsafe_content_type in enums.MBOX_SUPPORTED_MIME_TYPES:
# Process MBOX file asynchronously
- task = process_mbox_file_task.delay(file_key, str(recipient.id))
- response_data = {"task_id": task.id, "type": "mbox"}
+ task = process_mbox_file_task.send(file_key, str(recipient.id))
+ response_data = {"task_id": task.message_id, "type": "mbox"}
if request:
messages.info(
request,
f"Started processing MBOX file for recipient {recipient}. "
- "This may take a while. You can check the status in the Celery task monitor.",
+ "This may take a while.",
)
return True, response_data
# Check MIME type for EML
elif unsafe_content_type in enums.EML_SUPPORTED_MIME_TYPES:
# Process EML file asynchronously
- task = process_eml_file_task.delay(file_key, str(recipient.id))
- response_data = {"task_id": task.id, "type": "eml"}
+ task = process_eml_file_task.send(file_key, str(recipient.id))
+ response_data = {"task_id": task.message_id, "type": "eml"}
if request:
messages.info(
request,
f"Started processing EML file for recipient {recipient}. "
- "This may take a while. You can check the status in the Celery task monitor.",
+ "This may take a while.",
)
return True, response_data
except Exception as e:
@@ -129,7 +129,7 @@ def import_imap(
try:
# Start the import task
- task = import_imap_messages_task.delay(
+ task = import_imap_messages_task.send(
imap_server=imap_server,
imap_port=imap_port,
username=username,
@@ -137,12 +137,12 @@ def import_imap(
use_ssl=use_ssl,
recipient_id=str(recipient.id),
)
- response_data = {"task_id": task.id, "type": "imap"}
+ response_data = {"task_id": task.message_id, "type": "imap"}
if request:
messages.info(
request,
f"Started importing messages from IMAP server for recipient {recipient}. "
- "This may take a while. You can check the status in the Celery task monitor.",
+ "This may take a while.",
)
return True, response_data
diff --git a/src/backend/core/services/importer/tasks.py b/src/backend/core/services/importer/tasks.py
index a000022ab..52c842553 100644
--- a/src/backend/core/services/importer/tasks.py
+++ b/src/backend/core/services/importer/tasks.py
@@ -5,15 +5,14 @@
from django.core.files.storage import storages
+import dramatiq
import magic
-from celery.utils.log import get_task_logger
from core import enums
from core.mda.inbound import deliver_inbound_message
from core.mda.rfc5322 import parse_email_message
from core.models import Mailbox
-
-from messages.celery_app import app as celery_app
+from core.utils import register_task, set_task_progress
from .imap import (
IMAPConnectionManager,
@@ -24,11 +23,11 @@
select_imap_folder,
)
-logger = get_task_logger(__name__)
+logger = dramatiq.get_logger(__name__)
-@celery_app.task(bind=True)
-def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str, Any]:
+@register_task
+def process_mbox_file_task(file_key: str, recipient_id: str) -> Dict[str, Any]:
"""
Process a MBOX file asynchronously.
@@ -44,6 +43,8 @@ def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str,
total_messages = 0
current_message = 0
+ set_task_progress(0, {"message": "Loading recipient mailbox"})
+
try:
recipient = Mailbox.objects.get(id=recipient_id)
except Mailbox.DoesNotExist:
@@ -56,13 +57,6 @@ def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str,
"type": "mbox",
"current_message": 0,
}
- self.update_state(
- state="FAILURE",
- meta={
- "result": result,
- "error": error_msg,
- },
- )
return {
"status": "FAILURE",
"result": result,
@@ -70,53 +64,56 @@ def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str,
}
try:
+ set_task_progress(5, {"message": "Opening MBOX file"})
+
# Get storage and open file
message_imports_storage = storages["message-imports"]
with message_imports_storage.open(file_key, "rb") as file:
- self.update_state(
- state="PROGRESS",
- meta={
- "result": {
- "message_status": "Initializing import",
- "type": "mbox",
- },
- "error": None,
- },
- )
+ set_task_progress(10, {"message": "Validating file format"})
+
# Ensure file is a valid mbox file
content_type = magic.from_buffer(file.read(2048), mime=True)
if content_type not in enums.MBOX_SUPPORTED_MIME_TYPES:
raise Exception(f"Expected MBOX file, got {content_type}")
+ set_task_progress(15, {"message": "Counting messages"})
+
# First pass: count total messages
file.seek(0)
total_messages = count_mbox_messages(file)
+ set_task_progress(
+ 20,
+ {
+ "message": f"Found {total_messages} messages, starting processing",
+ "total_messages": total_messages,
+ },
+ )
+
# Reset file pointer
file.seek(0)
# Second pass: process messages
for i, message_content in enumerate(stream_mbox_messages(file), 1):
current_message = i
- try:
- # Update task state with progress
- result = {
- "message_status": f"Processing message {i} of {total_messages}",
- "total_messages": total_messages,
- "success_count": success_count,
- "failure_count": failure_count,
- "type": "mbox",
- "current_message": i,
- }
- self.update_state(
- state="PROGRESS",
- meta={
- "result": result,
- "error": None,
+
+ # Update progress every 100 messages or at the end
+ if i % 100 == 0 or i == total_messages:
+ progress_percentage = min(20 + (i / total_messages) * 70, 90)
+ set_task_progress(
+ int(progress_percentage),
+ {
+ "message": f"Processing message {i} of {total_messages}",
+ "total_messages": total_messages,
+ "success_count": success_count,
+ "failure_count": failure_count,
+ "type": "mbox",
+ "current_message": i,
},
)
+ try:
# Parse the email message
parsed_email = parse_email_message(message_content)
# Deliver the message
@@ -143,13 +140,7 @@ def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str,
"current_message": current_message,
}
- self.update_state(
- state="SUCCESS",
- meta={
- "result": result,
- "error": None,
- },
- )
+ set_task_progress(100, {"message": "MBOX processing completed successfully"})
return {
"status": "SUCCESS",
@@ -172,13 +163,6 @@ def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str,
"type": "mbox",
"current_message": current_message,
}
- self.update_state(
- state="FAILURE",
- meta={
- "result": result,
- "error": error_msg,
- },
- )
return {
"status": "FAILURE",
"result": result,
@@ -241,9 +225,8 @@ def stream_mbox_messages(file) -> Generator[bytes, None, None]:
yield message
-@celery_app.task(bind=True)
+@register_task
def import_imap_messages_task(
- self,
imap_server: str,
imap_port: int,
username: str,
@@ -270,6 +253,8 @@ def import_imap_messages_task(
current_message = 0
try:
+ set_task_progress(0, {"message": "Connecting to IMAP server"})
+
# Get recipient mailbox
recipient = Mailbox.objects.get(id=recipient_id)
@@ -324,7 +309,6 @@ def import_imap_messages_task(
message_list=message_list,
recipient=recipient,
username=username,
- task_instance=self,
success_count=success_count,
failure_count=failure_count,
current_message=current_message,
@@ -350,11 +334,6 @@ def import_imap_messages_task(
"current_message": current_message,
}
- self.update_state(
- state="SUCCESS",
- meta={"status": "SUCCESS", "result": result, "error": None},
- )
-
return {"status": "SUCCESS", "result": result, "error": None}
except Mailbox.DoesNotExist:
@@ -367,7 +346,6 @@ def import_imap_messages_task(
"type": "imap",
"current_message": 0,
}
- self.update_state(state="FAILURE", meta={"result": result, "error": error_msg})
return {"status": "FAILURE", "result": result, "error": error_msg}
except Exception as e:
@@ -382,12 +360,11 @@ def import_imap_messages_task(
"type": "imap",
"current_message": current_message,
}
- self.update_state(state="FAILURE", meta={"result": result, "error": error_msg})
return {"status": "FAILURE", "result": result, "error": error_msg}
-@celery_app.task(bind=True)
-def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, Any]:
+@register_task
+def process_eml_file_task(file_key: str, recipient_id: str) -> Dict[str, Any]:
"""
Process an EML file asynchronously.
@@ -398,6 +375,9 @@ def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, A
Returns:
Dict with task status and result
"""
+
+ set_task_progress(0, {"message": "Loading recipient mailbox"})
+
try:
recipient = Mailbox.objects.get(id=recipient_id)
except Mailbox.DoesNotExist:
@@ -410,36 +390,13 @@ def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, A
"type": "eml",
"current_message": 0,
}
- self.update_state(
- state="FAILURE",
- meta={
- "result": result,
- "error": error_msg,
- },
- )
return {
+ "status": "FAILURE",
"result": result,
"error": error_msg,
}
try:
- # Update progress state
- progress_result = {
- "message_status": "Processing message 1 of 1",
- "total_messages": 1,
- "success_count": 0,
- "failure_count": 0,
- "type": "eml",
- "current_message": 1,
- }
- self.update_state(
- state="PROGRESS",
- meta={
- "result": progress_result,
- "error": None,
- },
- )
-
# Get storage and read file
message_imports_storage = storages["message-imports"]
with message_imports_storage.open(file_key, "rb") as file:
@@ -467,13 +424,6 @@ def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, A
}
if success:
- self.update_state(
- state="SUCCESS",
- meta={
- "result": result,
- "error": None,
- },
- )
return {
"status": "SUCCESS",
"result": result,
@@ -481,13 +431,6 @@ def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, A
}
error_msg = "Failed to deliver message"
- self.update_state(
- state="FAILURE",
- meta={
- "result": result,
- "error": error_msg,
- },
- )
return {
"status": "FAILURE",
"result": result,
@@ -509,13 +452,6 @@ def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, A
"type": "eml",
"current_message": 1,
}
- self.update_state(
- state="FAILURE",
- meta={
- "result": result,
- "error": error_msg,
- },
- )
return {
"status": "FAILURE",
"result": result,
diff --git a/src/backend/core/services/search/__init__.py b/src/backend/core/services/search/__init__.py
index 51ff6886b..0a972ba38 100644
--- a/src/backend/core/services/search/__init__.py
+++ b/src/backend/core/services/search/__init__.py
@@ -6,9 +6,6 @@
get_opensearch_client,
index_message,
index_thread,
- reindex_all,
- reindex_mailbox,
- reindex_thread,
)
from core.services.search.mapping import MESSAGE_INDEX, MESSAGE_MAPPING
from core.services.search.parse import parse_search_query
@@ -25,9 +22,6 @@
# Indexing
"index_message",
"index_thread",
- "reindex_all",
- "reindex_mailbox",
- "reindex_thread",
# Parsing
"parse_search_query",
# Searching
diff --git a/src/backend/core/services/search/index.py b/src/backend/core/services/search/index.py
index 0f5f6ca8d..ba47de707 100644
--- a/src/backend/core/services/search/index.py
+++ b/src/backend/core/services/search/index.py
@@ -187,78 +187,3 @@ def index_thread(thread: models.Thread) -> bool:
except Exception as e:
logger.error("Error indexing thread %s: %s", thread.id, e)
return False
-
-
-def reindex_all():
- """Reindex all messages and threads."""
-
- # Delete and recreate the index
- delete_index()
- create_index_if_not_exists()
-
- # Count indexed items
- indexed_threads = 0
- indexed_messages = 0
-
- # Index all threads
- for thread in models.Thread.objects.all():
- if index_thread(thread):
- indexed_threads += 1
- indexed_messages += thread.messages.count()
-
- return {
- "status": "success",
- "indexed_threads": indexed_threads,
- "indexed_messages": indexed_messages,
- }
-
-
-def reindex_mailbox(mailbox_id: str):
- """Reindex all messages and threads for a specific mailbox."""
-
- # Count indexed items
- indexed_threads = 0
- indexed_messages = 0
-
- try:
- # Get the mailbox
- mailbox = models.Mailbox.objects.get(id=mailbox_id)
-
- # Index all threads the mailbox has access to
- for thread in mailbox.threads_viewer:
- if index_thread(thread):
- indexed_threads += 1
- indexed_messages += thread.messages.count()
-
- return {
- "status": "success",
- "mailbox": mailbox_id,
- "indexed_threads": indexed_threads,
- "indexed_messages": indexed_messages,
- }
- except models.Mailbox.DoesNotExist:
- return {"status": "error", "mailbox": mailbox_id, "error": "Mailbox not found"}
-
- # pylint: disable=broad-exception-caught
- except Exception as e:
- logger.error("Error reindexing mailbox %s: %s", mailbox_id, e)
- return {"status": "error", "mailbox": mailbox_id, "error": str(e)}
-
-
-def reindex_thread(thread_id: str):
- """Reindex a specific thread."""
-
- try:
- thread = models.Thread.objects.get(id=thread_id)
- success = index_thread(thread)
-
- return {
- "status": "success" if success else "error",
- "thread": thread_id,
- "indexed_messages": thread.messages.count() if success else 0,
- }
- except models.Thread.DoesNotExist:
- return {"status": "error", "thread": thread_id, "error": "Thread not found"}
- # pylint: disable=broad-exception-caught
- except Exception as e:
- return {"status": "error", "thread": thread_id, "error": str(e)}
diff --git a/src/backend/core/services/search/tasks.py b/src/backend/core/services/search/tasks.py
index 9600b77d3..4968c953e 100644
--- a/src/backend/core/services/search/tasks.py
+++ b/src/backend/core/services/search/tasks.py
@@ -4,7 +4,7 @@
from django.conf import settings
-from celery.utils.log import get_task_logger
+import dramatiq
from core import models
from core.services.search import (
@@ -13,18 +13,14 @@
index_message,
index_thread,
)
+from core.utils import register_task
-from messages.celery_app import app as celery_app
+logger = dramatiq.get_logger(__name__)
-logger = get_task_logger(__name__)
-
-def _reindex_all_base(update_progress=None):
- """Base function for reindexing all threads and messages.
-
- Args:
- update_progress: Optional callback function to update progress
- """
+@register_task
+def reindex_all_task():
+ """Reindex all threads and messages."""
if not settings.OPENSEARCH_INDEX_THREADS:
logger.info("OpenSearch thread indexing is disabled.")
return {"success": False, "reason": "disabled"}
@@ -50,8 +46,8 @@ def _reindex_all_base(update_progress=None):
logger.exception("Error indexing thread %s: %s", thread.id, e)
# Update progress if callback provided
- if update_progress and i % 100 == 0:
- update_progress(i, total, success_count, failure_count)
+ if i % 100 == 0:
+ logger.debug("Progress for all threads: %s of %s", i, total)
return {
"success": True,
@@ -61,27 +57,8 @@ def _reindex_all_base(update_progress=None):
}
-@celery_app.task(bind=True)
-def reindex_all(self):
- """Celery task wrapper for reindexing all threads and messages."""
-
- def update_progress(current, total, success_count, failure_count):
- """Update task progress."""
- self.update_state(
- state="PROGRESS",
- meta={
- "current": current,
- "total": total,
- "success_count": success_count,
- "failure_count": failure_count,
- },
- )
-
- return _reindex_all_base(update_progress)
-
-
-@celery_app.task(bind=True)
-def reindex_thread_task(self, thread_id):
+@register_task
+def reindex_thread_task(thread_id):
"""Reindex a specific thread and all its messages."""
if not settings.OPENSEARCH_INDEX_THREADS:
logger.info("OpenSearch thread indexing is disabled.")
@@ -113,8 +90,8 @@ def reindex_thread_task(self, thread_id):
raise
-@celery_app.task(bind=True)
-def reindex_mailbox_task(self, mailbox_id):
+@register_task
+def reindex_mailbox_task(mailbox_id):
"""Reindex all threads and messages in a specific mailbox."""
if not settings.OPENSEARCH_INDEX_THREADS:
logger.info("OpenSearch thread indexing is disabled.")
@@ -142,14 +119,8 @@ def reindex_mailbox_task(self, mailbox_id):
# Update progress every 50 threads
if i % 50 == 0:
- self.update_state(
- state="PROGRESS",
- meta={
- "current": i,
- "total": total,
- "success_count": success_count,
- "failure_count": failure_count,
- },
+ logger.debug(
+ "Updating progress for mailbox %s: %s of %s", mailbox_id, i, total
)
return {
@@ -161,8 +132,8 @@ def reindex_mailbox_task(self, mailbox_id):
}
-@celery_app.task(bind=True)
-def index_message_task(self, message_id):
+@register_task
+def index_message_task(message_id):
"""Index a single message."""
if not settings.OPENSEARCH_INDEX_THREADS:
logger.info("OpenSearch message indexing is disabled.")
@@ -201,10 +172,9 @@ def index_message_task(self, message_id):
raise
-@celery_app.task(bind=True)
-def reset_search_index(self):
+@register_task
+def reset_search_index():
"""Delete and recreate the OpenSearch index."""
-
delete_index()
create_index_if_not_exists()
return {"success": True}
diff --git a/src/backend/core/signals.py b/src/backend/core/signals.py
index e95b5ed0f..b4d668533 100644
--- a/src/backend/core/signals.py
+++ b/src/backend/core/signals.py
@@ -53,8 +53,8 @@ def index_message_post_save(sender, instance, created, **kwargs):
try:
# Schedule the indexing task asynchronously
- index_message_task.delay(str(instance.id))
- # reindex_thread_task.delay(str(instance.thread.id))
+ index_message_task.send(str(instance.id))
+ # reindex_thread_task.send(str(instance.thread.id))
# pylint: disable=broad-exception-caught
except Exception as e:
@@ -74,7 +74,7 @@ def index_message_recipient_post_save(sender, instance, created, **kwargs):
try:
# Schedule the indexing task asynchronously
# TODO: deduplicate the indexing of the message!
- index_message_task.delay(str(instance.message.id))
+ index_message_task.send(str(instance.message.id))
# pylint: disable=broad-exception-caught
except Exception as e:
@@ -93,7 +93,7 @@ def index_thread_post_save(sender, instance, created, **kwargs):
try:
# Schedule the indexing task asynchronously
- reindex_thread_task.delay(str(instance.id))
+ reindex_thread_task.send(str(instance.id))
# pylint: disable=broad-exception-caught
except Exception as e:
diff --git a/src/backend/core/tasks.py b/src/backend/core/tasks.py
index 02986e842..050499760 100644
--- a/src/backend/core/tasks.py
+++ b/src/backend/core/tasks.py
@@ -1,7 +1,6 @@
# pylint: disable=wildcard-import, unused-wildcard-import
-"""Register all tasks here so that Celery autodiscovery can find them."""
+"""Register all tasks here so that Dramatiq autodiscovery can find them."""
-from core.mda.tasks import * # noqa: F403
-from core.services.dns.tasks import * # noqa: F403
-from core.services.importer.tasks import * # noqa: F403
-from core.services.search.tasks import * # noqa: F403
+# from core.mda.tasks import *
+# from core.services.importer.tasks import *
+# from core.services.search.tasks import *
diff --git a/src/backend/core/templates/admin/index.html b/src/backend/core/templates/admin/index.html
new file mode 100644
index 000000000..e791196b9
--- /dev/null
+++ b/src/backend/core/templates/admin/index.html
@@ -0,0 +1,19 @@
+{% extends "admin/index.html" %}
+
+{% block content %}
+{{ block.super }}
+
+{% if user.is_staff %}
+
+{% endif %}
+{% endblock %}
\ No newline at end of file
diff --git a/src/backend/core/tests/api/test_messages_import.py b/src/backend/core/tests/api/test_messages_import.py
index 49899e1e1..babc7fd93 100644
--- a/src/backend/core/tests/api/test_messages_import.py
+++ b/src/backend/core/tests/api/test_messages_import.py
@@ -137,7 +137,7 @@ def test_api_import_eml_file(api_client, user, mailbox, eml_file):
)
-def test_api_import_mbox_file(api_client, user, mailbox, mbox_file):
+def test_api_import_mbox_file(api_client, user, mailbox, mbox_file, worker):
"""Test import of MBOX file."""
# add access to mailbox
mailbox.accesses.create(user=user, role=MailboxRoleChoices.ADMIN)
@@ -149,6 +149,9 @@ def test_api_import_mbox_file(api_client, user, mailbox, mbox_file):
)
assert response.status_code == 202
assert response.data["type"] == "mbox"
+
+ worker.join()
+
# Verify messages were created
assert Message.objects.count() == 3
messages = Message.objects.order_by("created_at")
@@ -184,9 +187,7 @@ def test_api_import_mbox_async(api_client, user, mailbox, mbox_file):
"""Test import of MBOX file asynchronously."""
# add access to mailbox
mailbox.accesses.create(user=user, role=MailboxRoleChoices.ADMIN)
- with patch(
- "core.services.importer.tasks.process_mbox_file_task.delay"
- ) as mock_task:
+ with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id"
mock_task.return_value.status = "PENDING"
response = api_client.post(
@@ -217,7 +218,7 @@ def test_api_import_imap_task(api_client, user, mailbox):
"""Test import of IMAP messages."""
mailbox.accesses.create(user=user, role=MailboxRoleChoices.ADMIN)
with patch(
- "core.services.importer.tasks.import_imap_messages_task.delay"
+ "core.services.importer.tasks.import_imap_messages_task.send"
) as mock_task:
mock_task.return_value.id = "fake-task-id"
data = {
@@ -335,7 +336,7 @@ def test_api_import_duplicate_eml_file(api_client, user, mailbox, eml_file):
assert Thread.objects.count() == 0
# First import
- with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task:
+ with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id-1"
response = api_client.post(
IMPORT_FILE_URL,
@@ -360,7 +361,7 @@ def test_api_import_duplicate_eml_file(api_client, user, mailbox, eml_file):
assert Thread.objects.count() == 1
# Import again the same file
- with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task:
+ with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id-2"
response = api_client.post(
IMPORT_FILE_URL,
@@ -394,9 +395,7 @@ def test_api_import_duplicate_mbox_file(api_client, user, mailbox, mbox_file):
assert Thread.objects.count() == 0
# First import
- with patch(
- "core.services.importer.tasks.process_mbox_file_task.delay"
- ) as mock_task:
+ with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id-1"
response = api_client.post(
IMPORT_FILE_URL,
@@ -424,9 +423,7 @@ def test_api_import_duplicate_mbox_file(api_client, user, mailbox, mbox_file):
assert Thread.objects.count() == 2
# Second import of the same file
- with patch(
- "core.services.importer.tasks.process_mbox_file_task.delay"
- ) as mock_task:
+ with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id-2"
response = api_client.post(
IMPORT_FILE_URL,
@@ -466,7 +463,7 @@ def test_api_import_eml_same_message_different_mailboxes(api_client, user, eml_f
assert Message.objects.count() == 0
# Import to first mailbox
- with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task:
+ with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id-1"
response = api_client.post(
IMPORT_FILE_URL,
@@ -490,7 +487,7 @@ def test_api_import_eml_same_message_different_mailboxes(api_client, user, eml_f
assert Message.objects.count() == 1
# Import to second mailbox
- with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task:
+ with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id-2"
response = api_client.post(
IMPORT_FILE_URL,
@@ -537,9 +534,7 @@ def test_api_import_mbox_same_message_different_mailboxes(api_client, user, mbox
assert Message.objects.count() == 0
# Import to first mailbox
- with patch(
- "core.services.importer.tasks.process_mbox_file_task.delay"
- ) as mock_task:
+ with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id-1"
response = api_client.post(
IMPORT_FILE_URL,
@@ -563,9 +558,7 @@ def test_api_import_mbox_same_message_different_mailboxes(api_client, user, mbox
assert Message.objects.count() == 3
# Import to second mailbox
- with patch(
- "core.services.importer.tasks.process_mbox_file_task.delay"
- ) as mock_task:
+ with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id-2"
response = api_client.post(
IMPORT_FILE_URL,
@@ -767,7 +760,7 @@ def test_api_import_duplicate_imap_messages_different_mailboxes(
# assert Thread.objects.count() == 0
# # First import
-# with patch("core.mda.tasks.process_mbox_file_task.delay") as mock_task:
+# with patch("core.mda.tasks.process_mbox_file_task.send") as mock_task:
# mock_task.return_value.id = "fake-task-id-1"
# with open(mbox_file_path, "rb") as f:
# response = api_client.post(
@@ -814,7 +807,7 @@ def test_api_import_duplicate_imap_messages_different_mailboxes(
# assert messages[1].thread.messages.count() == 2
# # Second import of the same file
-# with patch("core.mda.tasks.process_mbox_file_task.delay") as mock_task:
+# with patch("core.mda.tasks.process_mbox_file_task.send") as mock_task:
# mock_task.return_value.id = "fake-task-id-2"
# with open(mbox_file_path, "rb") as f:
# response = api_client.post(
@@ -873,7 +866,7 @@ def test_api_import_duplicate_imap_messages_different_mailboxes(
# This is another reply to the same thread."""
-# with patch("core.mda.tasks.process_mbox_file_task.delay") as mock_task:
+# with patch("core.mda.tasks.process_mbox_file_task.send") as mock_task:
# mock_task.return_value.id = "fake-task-id-3"
# # Create a new MBOX file with just the new message
# new_mbox_content = b"From \n" + new_message_content + b"\n\n"
diff --git a/src/backend/core/tests/api/test_send_message_signature.py b/src/backend/core/tests/api/test_send_message_signature.py
index 0223ecf79..e3fe8bb3c 100644
--- a/src/backend/core/tests/api/test_send_message_signature.py
+++ b/src/backend/core/tests/api/test_send_message_signature.py
@@ -118,7 +118,7 @@ def test_api_send_message_with_signature_placeholder(
with patch("core.api.viewsets.send.send_message_task") as mock_task:
mock_task_instance = MagicMock()
mock_task_instance.id = "task-123"
- mock_task.delay.return_value = mock_task_instance
+ mock_task.send.return_value = mock_task_instance
# Send request
response = client.post(
@@ -158,7 +158,7 @@ def test_api_send_message_without_signature_placeholder(
with patch("core.api.viewsets.send.send_message_task") as mock_task:
mock_task_instance = MagicMock()
mock_task_instance.id = "task-123"
- mock_task.delay.return_value = mock_task_instance
+ mock_task.send.return_value = mock_task_instance
# Send request
response = client.post(
@@ -196,7 +196,7 @@ def test_api_send_message_with_text_body_only(
with patch("core.api.viewsets.send.send_message_task") as mock_task:
mock_task_instance = MagicMock()
mock_task_instance.id = "task-123"
- mock_task.delay.return_value = mock_task_instance
+ mock_task.send.return_value = mock_task_instance
# Send request with text body only
response = client.post(
@@ -241,7 +241,7 @@ def test_api_send_message_with_html_body_only(
with patch("core.api.viewsets.send.send_message_task") as mock_task:
mock_task_instance = MagicMock()
mock_task_instance.id = "task-123"
- mock_task.delay.return_value = mock_task_instance
+ mock_task.send.return_value = mock_task_instance
# Send request with HTML body only
response = client.post(
@@ -292,7 +292,7 @@ def test_api_send_message_with_signature_and_reply(
with patch("core.api.viewsets.send.send_message_task") as mock_task:
mock_task_instance = MagicMock()
mock_task_instance.id = "task-123"
- mock_task.delay.return_value = mock_task_instance
+ mock_task.send.return_value = mock_task_instance
# Send request
response = client.post(
@@ -333,7 +333,7 @@ def test_api_send_message_with_archive_true(
with patch("core.api.viewsets.send.send_message_task") as mock_task:
mock_task_instance = MagicMock()
mock_task_instance.id = "task-123"
- mock_task.delay.return_value = mock_task_instance
+ mock_task.send.return_value = mock_task_instance
# Send request with HTML body only
response = client.post(
@@ -350,7 +350,7 @@ def test_api_send_message_with_archive_true(
assert response.status_code == status.HTTP_200_OK
assert response.data["task_id"] == "task-123"
- mock_task.delay.assert_called_once_with(
+ mock_task.send.assert_called_once_with(
str(draft_message.id), must_archive=True
)
@@ -366,7 +366,7 @@ def test_api_send_message_with_archive_false(
with patch("core.api.viewsets.send.send_message_task") as mock_task:
mock_task_instance = MagicMock()
mock_task_instance.id = "task-123"
- mock_task.delay.return_value = mock_task_instance
+ mock_task.send.return_value = mock_task_instance
# Send request with archive=False
response = client.post(
@@ -385,7 +385,7 @@ def test_api_send_message_with_archive_false(
assert response.data["task_id"] == "task-123"
# Verify the task was called with must_archive=False
- mock_task.delay.assert_called_once_with(
+ mock_task.send.assert_called_once_with(
str(draft_message.id), must_archive=False
)
@@ -401,7 +401,7 @@ def test_api_send_message_without_archive_parameter(
with patch("core.api.viewsets.send.send_message_task") as mock_task:
mock_task_instance = MagicMock()
mock_task_instance.id = "task-123"
- mock_task.delay.return_value = mock_task_instance
+ mock_task.send.return_value = mock_task_instance
# Send request without archive parameter
response = client.post(
@@ -419,6 +419,6 @@ def test_api_send_message_without_archive_parameter(
assert response.data["task_id"] == "task-123"
# Verify the task was called with must_archive=False (default)
- mock_task.delay.assert_called_once_with(
+ mock_task.send.assert_called_once_with(
str(draft_message.id), must_archive=False
)
diff --git a/src/backend/core/tests/api/test_task.py b/src/backend/core/tests/api/test_task.py
new file mode 100644
index 000000000..43481c9cf
--- /dev/null
+++ b/src/backend/core/tests/api/test_task.py
@@ -0,0 +1,145 @@
+"""Tests for task API endpoints."""
+
+import time
+import uuid
+
+from django.core.cache import cache
+from django.test import TestCase
+
+import pytest
+from rest_framework import status
+from rest_framework.test import APIClient
+
+from core import factories
+from core.mda.tasks import send_message_task
+from core.utils import get_task_progress
+
+
+class TaskDetailViewTest(TestCase):
+ """Test the TaskDetailView API endpoint."""
+
+ def setUp(self):
+ """Set up test data."""
+ self.client = APIClient()
+ self.task_id = uuid.uuid4()
+ self.url = f"/api/v1.0/tasks/{self.task_id}/"
+
+ self.user = factories.UserFactory()
+ self.client.force_authenticate(user=self.user)
+
+ def _set_progress_data(self, task_id, progress, metadata=None):
+ """Helper method to set progress data directly in cache for testing."""
+ progress_data = {
+ "progress": progress,
+ "timestamp": time.time(),
+ "metadata": metadata or {},
+ }
+ cache.set(f"task_progress:{task_id}", progress_data, timeout=86400)
+
+ def test_task_status_pending(self):
+ """Test task status when no progress data exists."""
+ # Don't set any progress data - should return PENDING
+ response = self.client.get(self.url)
+
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.data["status"], "PENDING")
+ self.assertIsNone(response.data["result"])
+ self.assertIsNone(response.data["error"])
+
+ def test_task_status_progress(self):
+ """Test task status when progress data exists."""
+ # Set real progress data using helper method
+ self._set_progress_data(self.task_id, 75, {"message": "Processing batch 3"})
+
+ response = self.client.get(self.url)
+
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.data["status"], "PROGRESS")
+ self.assertEqual(response.data["progress"], 75)
+ self.assertEqual(response.data["message"], "Processing batch 3")
+ self.assertIsNotNone(response.data["timestamp"])
+ self.assertIsNone(response.data["result"])
+ self.assertIsNone(response.data["error"])
+
+ def test_task_status_progress_no_message(self):
+ """Test task status when progress data exists but no message."""
+ # Set progress data without message
+ self._set_progress_data(self.task_id, 50, {})
+
+ response = self.client.get(self.url)
+
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.data["status"], "PROGRESS")
+ self.assertEqual(response.data["progress"], 50)
+ self.assertIsNone(response.data["message"])
+ self.assertIsNotNone(response.data["timestamp"])
+
+ def test_task_status_requires_authentication(self):
+ """Test that the endpoint requires authentication."""
+ # Don't authenticate the client
+ response = APIClient().get(self.url)
+
+ self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
+
+ def test_task_status_with_different_task_ids(self):
+ """Test that different task IDs work correctly."""
+ task_id_2 = uuid.uuid4()
+ url_2 = f"/api/v1.0/tasks/{task_id_2}/"
+
+ # Set progress for the second task
+ self._set_progress_data(task_id_2, 25, {"message": "Starting task"})
+
+ response = self.client.get(url_2)
+
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.data["status"], "PROGRESS")
+ self.assertEqual(response.data["progress"], 25)
+ self.assertEqual(response.data["message"], "Starting task")
+
+ # First task should still be pending
+ response = self.client.get(self.url)
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.data["status"], "PENDING")
+
+ def test_task_progress_retrieval(self):
+ """Test that we can retrieve progress data directly."""
+ # Set progress data using helper method
+ self._set_progress_data(self.task_id, 90, {"message": "Almost done"})
+
+ # Retrieve it directly
+ progress_data = get_task_progress(self.task_id)
+
+ self.assertIsNotNone(progress_data)
+ self.assertEqual(progress_data["progress"], 90)
+ self.assertEqual(progress_data["metadata"]["message"], "Almost done")
+ self.assertIsNotNone(progress_data["timestamp"])
+
+ def test_task_progress_nonexistent(self):
+ """Test that nonexistent task returns None."""
+ nonexistent_task_id = "nonexistent-task-999"
+
+ progress_data = get_task_progress(nonexistent_task_id)
+
+ self.assertIsNone(progress_data)
+
+
+@pytest.mark.django_db
+def test_task_api_integration(worker):
+ """Integration test with actual Dramatiq task."""
+ # Create a test message (you might need to adjust this based on your models)
+ # This is a basic integration test to verify the task API works with real tasks
+
+ # Send a task
+ result = send_message_task.send("test-message-id")
+ task_id = result.message_id
+
+ # Process the task synchronously
+ worker.join()
+
+ # Test the API endpoint
+ client = APIClient()
+ response = client.get(f"/api/v1/tasks/{task_id}/")
+
+ # Should return a valid response
+ assert response.status_code == 401 # Unauthorized without auth
+ # In a real test, you'd authenticate the client first
diff --git a/src/backend/core/tests/conftest.py b/src/backend/core/tests/conftest.py
index 6cb6f9f6a..3426593fa 100644
--- a/src/backend/core/tests/conftest.py
+++ b/src/backend/core/tests/conftest.py
@@ -2,6 +2,7 @@
from unittest import mock
+import dramatiq
import pytest
USER = "user"
@@ -9,6 +10,23 @@
VIA = [USER, TEAM]
+@pytest.fixture(name="worker_broker")
+def fixture_worker_broker():
+ """Fixture that provides a clean StubBroker for testing."""
+ broker = dramatiq.get_broker()
+ broker.flush_all()
+ return broker
+
+
+@pytest.fixture(name="worker")
+def fixture_worker(worker_broker):
+ """Fixture that provides a Dramatiq worker for testing."""
+ worker = dramatiq.Worker(worker_broker, worker_timeout=100)
+ worker.start()
+ yield worker
+ worker.stop()
+
+
@pytest.fixture
def mock_user_teams():
"""Mock for the "teams" property on the User model."""
@@ -16,16 +34,3 @@ def mock_user_teams():
"core.models.User.teams", new_callable=mock.PropertyMock
) as mock_teams:
yield mock_teams
-
-
-# @pytest.fixture
-# @pytest.mark.django_db
-# def create_testdomain():
-# """Create the TESTDOMAIN."""
-# from core import models
-# models.MailDomain.objects.get_or_create(
-# name=settings.MESSAGES_TESTDOMAIN,
-# defaults={
-# "oidc_autojoin": True
-# }
-# )
diff --git a/src/backend/core/tests/importer/test_file_import.py b/src/backend/core/tests/importer/test_file_import.py
index 5d5e918ea..86940b930 100644
--- a/src/backend/core/tests/importer/test_file_import.py
+++ b/src/backend/core/tests/importer/test_file_import.py
@@ -107,7 +107,7 @@ def test_import_eml_file(admin_client, eml_file, mailbox):
mock_task.update_state = MagicMock()
with (
- patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_delay,
+ patch("core.services.importer.tasks.process_eml_file_task.send") as mock_delay,
patch.object(process_eml_file_task, "update_state", mock_task.update_state),
):
mock_delay.return_value.id = "fake-task-id"
@@ -183,7 +183,7 @@ def test_import_eml_file(admin_client, eml_file, mailbox):
@pytest.mark.django_db
def test_process_mbox_file_task(mailbox, mbox_file):
- """Test the Celery task that processes MBOX files."""
+ """Test the task that processes MBOX files."""
# Create a mock task instance
mock_task = MagicMock()
mock_task.update_state = MagicMock()
diff --git a/src/backend/core/tests/importer/test_imap_import.py b/src/backend/core/tests/importer/test_imap_import.py
index 0558ba620..21bdf8feb 100644
--- a/src/backend/core/tests/importer/test_imap_import.py
+++ b/src/backend/core/tests/importer/test_imap_import.py
@@ -14,8 +14,6 @@
from core.models import Mailbox, MailDomain, Message, Thread
from core.services.importer.tasks import import_imap_messages_task
-from messages.celery_app import app as celery_app
-
@pytest.fixture
def admin_user(db):
@@ -163,7 +161,7 @@ def test_imap_import_form_view(admin_client, mailbox):
}
with patch(
- "core.services.importer.tasks.import_imap_messages_task.delay"
+ "core.services.importer.tasks.import_imap_messages_task.send"
) as mock_task:
response = admin_client.post(url, form_data, follow=True)
assert response.status_code == 200
@@ -174,13 +172,11 @@ def test_imap_import_form_view(admin_client, mailbox):
@patch("imaplib.IMAP4_SSL")
-@patch.object(celery_app.backend, "store_result")
def test_imap_import_task_success(
- mock_store_result, mock_imap4_ssl, mailbox, mock_imap_connection, sample_email
+ mock_imap4_ssl, mailbox, mock_imap_connection, sample_email
):
"""Test successful IMAP import task execution."""
mock_imap4_ssl.return_value = mock_imap_connection
- mock_store_result.return_value = None
# Create a mock task instance
mock_task = MagicMock()
@@ -310,12 +306,11 @@ def test_imap_import_task_login_failure(mailbox):
@patch("imaplib.IMAP4_SSL")
-@patch.object(celery_app.backend, "store_result")
+@patch("core.services.importer.tasks.set_task_progress")
def test_imap_import_task_message_fetch_failure(
- mock_store_result, mock_imap4_ssl, mailbox
+ mock_set_task_progress, mock_imap4_ssl, mailbox
):
"""Test IMAP import task with message fetch failure."""
- mock_store_result.return_value = None
mock_imap = MagicMock()
mock_imap.login.return_value = ("OK", [b"Logged in"])
@@ -330,67 +325,56 @@ def test_imap_import_task_message_fetch_failure(
mock_imap.logout.return_value = ("OK", [b"Logged out"])
mock_imap4_ssl.return_value = mock_imap
- # Create a mock task instance
- mock_task = MagicMock()
- mock_task.update_state = MagicMock()
-
- with patch.object(
- import_imap_messages_task, "update_state", mock_task.update_state
- ):
- # Run the task
- task = import_imap_messages_task(
- imap_server="imap.example.com",
- imap_port=993,
- username="test@example.com",
- password="password123",
- use_ssl=True,
- recipient_id=str(mailbox.id),
- )
-
- # Verify all messages failed
- assert task["status"] == "SUCCESS"
- assert (
- task["result"]["message_status"]
- == "Completed processing messages from folder 'INBOX'"
- )
- assert task["result"]["type"] == "imap"
- assert task["result"]["total_messages"] == 3
- assert task["result"]["success_count"] == 0
- assert task["result"]["failure_count"] == 3
- assert task["result"]["current_message"] == 3
+ mock_set_task_progress = MagicMock()
- # Verify progress updates were called correctly
- assert mock_task.update_state.call_count == 4 # 3 PROGRESS + 1 SUCCESS
+ # Run the task
+ task = import_imap_messages_task(
+ imap_server="imap.example.com",
+ imap_port=993,
+ username="test@example.com",
+ password="password123",
+ use_ssl=True,
+ recipient_id=str(mailbox.id),
+ )
- # Verify progress updates
- for i in range(1, 4):
- mock_task.update_state.assert_any_call(
- state="PROGRESS",
- meta={
- "result": {
- "message_status": f"Processing message {i} of 3",
- "total_messages": 3,
- "success_count": 0,
- "failure_count": i, # Current message failed
- "type": "imap",
- "current_message": i,
- },
- "error": None,
+ # Verify all messages failed
+ assert task["status"] == "SUCCESS"
+ assert (
+ task["result"]["message_status"]
+ == "Completed processing messages from folder 'INBOX'"
+ )
+ assert task["result"]["type"] == "imap"
+ assert task["result"]["total_messages"] == 3
+ assert task["result"]["success_count"] == 0
+ assert task["result"]["failure_count"] == 3
+ assert task["result"]["current_message"] == 3
+
+ # Verify progress updates were called correctly
+ assert mock_set_task_progress.call_count == 3 # 3 PROGRESS
+
+ # Verify progress updates
+ for i in range(1, 4):
+ mock_set_task_progress.assert_any_call(
+ state="PROGRESS",
+ meta={
+ "result": {
+ "message_status": f"Processing message {i} of 3",
+ "total_messages": 3,
+ "success_count": 0,
+ "failure_count": i, # Current message failed
+ "type": "imap",
+ "current_message": i,
},
- )
-
- # Verify success update
- mock_task.update_state.assert_any_call(
- state="SUCCESS",
- meta=task,
+ "error": None,
+ },
)
@patch("core.mda.inbound.logger")
@patch("imaplib.IMAP4_SSL")
-@patch.object(celery_app.backend, "store_result")
+@patch("core.services.importer.tasks.set_task_progress")
def test_imap_import_task_duplicate_recipients(
- mock_store_result,
+ mock_set_task_progress,
mock_imap4_ssl,
mock_logger,
mailbox,
@@ -398,7 +382,6 @@ def test_imap_import_task_duplicate_recipients(
):
"""Test IMAP import task with duplicate recipients handles deduplication correctly."""
mock_imap4_ssl.return_value = mock_imap_connection_with_duplicates
- mock_store_result.return_value = None
# Create a mock task instance
mock_task = MagicMock()
diff --git a/src/backend/core/tests/importer/test_import_service.py b/src/backend/core/tests/importer/test_import_service.py
index 9f36704da..29e1b7bf2 100644
--- a/src/backend/core/tests/importer/test_import_service.py
+++ b/src/backend/core/tests/importer/test_import_service.py
@@ -135,7 +135,7 @@ def mbox_key(user, mbox_file):
@pytest.mark.django_db
def test_import_file_eml_by_superuser(admin_user, mailbox, eml_key, mock_request):
"""Test successful EML file import for superuser."""
- with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task:
+ with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id"
success, response_data = ImportService.import_file(
file_key=eml_key,
@@ -235,7 +235,7 @@ def test_import_file_eml_by_user_with_access_task(user, mailbox, eml_key, mock_r
# Add access to mailbox
mailbox.accesses.create(user=user, role=MailboxRoleChoices.ADMIN)
- with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task:
+ with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id"
success, response_data = ImportService.import_file(
file_key=eml_key,
@@ -338,9 +338,7 @@ def test_import_file_mbox_by_superuser_task(
):
"""Test successful MBOX file import by superuser."""
- with patch(
- "core.services.importer.tasks.process_mbox_file_task.delay"
- ) as mock_task:
+ with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id"
success, response_data = ImportService.import_file(
file_key=mbox_key,
@@ -363,9 +361,7 @@ def test_import_file_mbox_by_user_with_access_task(
# Add access to mailbox
mailbox.accesses.create(user=user, role=MailboxRoleChoices.ADMIN)
- with patch(
- "core.services.importer.tasks.process_mbox_file_task.delay"
- ) as mock_task:
+ with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task:
mock_task.return_value.id = "fake-task-id"
success, response_data = ImportService.import_file(
file_key=mbox_key,
@@ -443,7 +439,7 @@ def test_import_file_invalid_file(admin_user, mailbox, mock_request):
try:
with patch(
- "core.services.importer.tasks.process_eml_file_task.delay"
+ "core.services.importer.tasks.process_eml_file_task.send"
) as mock_task:
# The task should not be called for invalid files
mock_task.assert_not_called()
@@ -470,7 +466,7 @@ def test_import_file_invalid_file(admin_user, mailbox, mock_request):
def test_import_imap_by_superuser(admin_user, mailbox, mock_request):
"""Test successful IMAP import."""
with patch(
- "core.services.importer.tasks.import_imap_messages_task.delay"
+ "core.services.importer.tasks.import_imap_messages_task.send"
) as mock_task:
mock_task.return_value.id = "fake-task-id"
success, response_data = ImportService.import_imap(
@@ -504,7 +500,7 @@ def test_import_imap_by_user_with_access(user, mailbox, mock_request, role):
mailbox.accesses.create(user=user, role=role)
with patch(
- "core.services.importer.tasks.import_imap_messages_task.delay"
+ "core.services.importer.tasks.import_imap_messages_task.send"
) as mock_task:
mock_task.return_value.id = "fake-task-id"
success, response_data = ImportService.import_imap(
@@ -550,7 +546,7 @@ def test_import_imap_task_error(admin_user, mailbox, mock_request):
mailbox.accesses.create(user=admin_user, role=MailboxRoleChoices.ADMIN)
with patch(
- "core.services.importer.tasks.import_imap_messages_task.delay"
+ "core.services.importer.tasks.import_imap_messages_task.send"
) as mock_task:
mock_task.side_effect = Exception("Task error")
success, response_data = ImportService.import_imap(
diff --git a/src/backend/core/tests/search/test_search.py b/src/backend/core/tests/search/test_search.py
index 25371df05..1d33ddff6 100644
--- a/src/backend/core/tests/search/test_search.py
+++ b/src/backend/core/tests/search/test_search.py
@@ -17,10 +17,12 @@
delete_index,
index_message,
index_thread,
- reindex_all,
- reindex_mailbox,
search_threads,
)
+from core.services.search.tasks import (
+ reindex_all_task,
+ reindex_mailbox_task,
+)
@pytest.fixture(name="mock_es_client_search")
@@ -137,7 +139,7 @@ def test_reindex_all(mock_es_client_index):
mock_es_client_index.indices.exists.return_value = False
# Call the function
- result = reindex_all()
+ result = reindex_all_task()
# Verify result
assert result["status"] == "success"
@@ -152,7 +154,7 @@ def test_reindex_mailbox(mock_es_client_index, test_mailbox, test_thread):
"""Test reindexing a specific mailbox."""
# Call the function
- result = reindex_mailbox(str(test_mailbox.id))
+ result = reindex_mailbox_task(str(test_mailbox.id))
assert mock_es_client_index.index.call_count > 0
diff --git a/src/backend/core/utils.py b/src/backend/core/utils.py
index 299b191f7..5568aab30 100644
--- a/src/backend/core/utils.py
+++ b/src/backend/core/utils.py
@@ -1,8 +1,26 @@
"""Root utils for the core application."""
import json
+import time
+from typing import Any, Dict, Optional
+
+from django.core.cache import cache
from configurations import values
+from dramatiq import actor
+from dramatiq.middleware import CurrentMessage
+
+
+def register_task(func):
+ """Register a function as a Dramatiq task with result storage enabled.
+
+ Args:
+ func: The function to register as a task
+
+ Returns:
+ The decorated function as a Dramatiq actor
+ """
+ return actor(store_results=True)(func)
class JSONValue(values.Value):
@@ -16,3 +34,36 @@ def to_python(self, value):
Return the python representation of the JSON string.
"""
return json.loads(value)
+
+
+def set_task_progress(progress: int, metadata: Optional[Dict[str, Any]] = None) -> None:
+ """Set task progress in cache.
+
+ Args:
+ progress: Progress percentage (0-100)
+ metadata: Optional metadata dictionary
+ """
+ # Get the current message ID from Dramatiq CurrentMessage middleware
+ current_message = CurrentMessage.get_current_message()
+ if not current_message:
+ return # Do nothing if no current message
+
+ task_id = current_message.message_id
+ progress_data = {
+ "progress": progress,
+ "timestamp": time.time(),
+ "metadata": metadata or {},
+ }
+ cache.set(f"task_progress:{task_id}", progress_data, timeout=86400) # 24 hours
+
+
+def get_task_progress(task_id: str) -> Optional[Dict[str, Any]]:
+ """Get task progress from cache.
+
+ Args:
+ task_id: Task identifier
+
+ Returns:
+ Progress data or None if not found
+ """
+ return cache.get(f"task_progress:{task_id}")
diff --git a/src/backend/core/worker_setup.py b/src/backend/core/worker_setup.py
new file mode 100644
index 000000000..60e809c6a
--- /dev/null
+++ b/src/backend/core/worker_setup.py
@@ -0,0 +1,11 @@
+"""Worker setup module for Dramatiq workers.
+
+This module configures Django and imports configurations for worker processes.
+"""
+
+import django
+
+from configurations.importer import install
+
+install(check_options=True)
+django.setup()
diff --git a/src/backend/messages/__init__.py b/src/backend/messages/__init__.py
index bf4eac6f6..db8a90f94 100644
--- a/src/backend/messages/__init__.py
+++ b/src/backend/messages/__init__.py
@@ -1,5 +1 @@
"""Messages module."""
-
-from .celery_app import app as celery_app
-
-__all__ = ("celery_app",)
diff --git a/src/backend/messages/celery_app.py b/src/backend/messages/celery_app.py
deleted file mode 100644
index 0b2cb7db8..000000000
--- a/src/backend/messages/celery_app.py
+++ /dev/null
@@ -1,43 +0,0 @@
-"""Messages celery configuration file."""
-
-import os
-
-from celery import Celery
-from configurations.importer import install
-
-# Set the default Django settings module for the 'celery' program.
-os.environ.setdefault("DJANGO_SETTINGS_MODULE", "messages.settings")
-os.environ.setdefault("DJANGO_CONFIGURATION", "Development")
-
-install(check_options=True)
-
-# Must be imported after install()
-from django.conf import settings # pylint: disable=wrong-import-position
-
-app = Celery("messages")
-
-# Using a string here means the worker doesn't have to serialize
-# the configuration object to child processes.
-# - namespace='CELERY' means all celery-related configuration keys
-# should have a `CELERY_` prefix.
-app.config_from_object("django.conf:settings", namespace="CELERY")
-
-# Load task modules from all registered Django apps.
-app.autodiscover_tasks()
-
-# Configure beat schedule
-# This can be disabled manually, for example when pushing the application for the first time
-# to a PaaS service when no migration was applied yet.
-if not settings.DISABLE_CELERY_BEAT_SCHEDULE:
- app.conf.beat_schedule = {
- "retry-pending-messages": {
- "task": "core.mda.tasks.retry_messages_task",
- "schedule": 300.0, # Every 5 minutes (300 seconds)
- "options": {"queue": "default"},
- },
- "selfcheck": {
- "task": "core.mda.tasks.selfcheck_task",
- "schedule": settings.MESSAGES_SELFCHECK_INTERVAL,
- "options": {"queue": "default"},
- },
- }
diff --git a/src/backend/messages/settings.py b/src/backend/messages/settings.py
index e56a9f29c..f96f2762a 100755
--- a/src/backend/messages/settings.py
+++ b/src/backend/messages/settings.py
@@ -441,8 +441,8 @@ class Base(Configuration):
"drf_spectacular",
# Third party apps
"corsheaders",
- "django_celery_beat",
- "django_celery_results",
+ "django_dramatiq",
+ "dramatiq_crontab",
"django_filters",
"rest_framework",
"parler",
@@ -528,19 +528,62 @@ class Base(Configuration):
None, environ_name="FRONTEND_THEME", environ_prefix=None
)
- # Celery
- CELERY_BROKER_URL = values.Value(
- "redis://redis:6379", environ_name="CELERY_BROKER_URL", environ_prefix=None
- )
- CELERY_RESULT_BACKEND = "django-db"
- CELERY_CACHE_BACKEND = "django-cache"
- CELERY_BROKER_TRANSPORT_OPTIONS = values.DictValue({})
- CELERY_RESULT_EXTENDED = True
- CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 * 30 # 30 days
- CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
+ # Dramatiq
+ DRAMATIQ_BROKER = {
+ "BROKER": "dramatiq.brokers.redis.RedisBroker",
+ "OPTIONS": {
+ "url": values.Value(
+ "redis://redis:6379",
+ environ_name="DRAMATIQ_BROKER_URL",
+ environ_prefix=None,
+ ),
+ },
+ "MIDDLEWARE": [
+ "dramatiq.middleware.Prometheus",
+ "dramatiq.middleware.AgeLimit",
+ "dramatiq.middleware.TimeLimit",
+ "dramatiq.middleware.Callbacks",
+ "dramatiq.middleware.Retries",
+ "dramatiq.middleware.CurrentMessage",
+ # "dramatiq.results.middleware.Results",
+ "django_dramatiq.middleware.DbConnectionsMiddleware",
+ "django_dramatiq.middleware.AdminMiddleware",
+ ],
+ }
+
+ DRAMATIQ_RESULT_BACKEND = {
+ "BACKEND": "dramatiq.results.backends.redis.RedisBackend",
+ "BACKEND_OPTIONS": {
+ "url": values.Value(
+ "redis://redis:6379",
+ environ_name="DRAMATIQ_RESULT_BACKEND",
+ environ_prefix=None,
+ ),
+ },
+ "MIDDLEWARE_OPTIONS": {
+ "result_ttl": 7 * 24 * 60 * 60 # 7 days
+ },
+ }
+
+ # Defines which database should be used to persist Task objects when the
+ # AdminMiddleware is enabled. The default value is "default".
+ DRAMATIQ_TASKS_DATABASE = "default"
+
+ # Dramatiq Crontab configuration
+ DRAMATIQ_CRONTAB = {
+ "REDIS_URL": values.Value(
+ "redis://redis:6379/0",
+ environ_name="DRAMATIQ_CRONTAB_REDIS_URL",
+ environ_prefix=None,
+ ),
+ }
- DISABLE_CELERY_BEAT_SCHEDULE = values.BooleanValue(
- default=False, environ_name="DISABLE_CELERY_BEAT_SCHEDULE", environ_prefix=None
+ DRAMATIQ_AUTODISCOVER_MODULES = [
+ "tasks"
+ ] # services.search.tasks", "services.importer.tasks"]
+
+ DISABLE_DRAMATIQ_SCHEDULE = values.BooleanValue(
+ default=False, environ_name="DISABLE_DRAMATIQ_SCHEDULE", environ_prefix=None
)
# Session
@@ -900,7 +943,21 @@ class DevelopmentMinimal(Development):
Development environment settings with minimal dependencies
"""
- CELERY_TASK_ALWAYS_EAGER = True
+ DRAMATIQ_BROKER = {
+ "BROKER": "dramatiq.brokers.stub.StubBroker",
+ "OPTIONS": {},
+ "MIDDLEWARE": [
+ "dramatiq.middleware.AgeLimit",
+ "dramatiq.middleware.TimeLimit",
+ "dramatiq.middleware.Callbacks",
+ "dramatiq.middleware.Retries",
+ "dramatiq.middleware.CurrentMessage",
+ "dramatiq.middleware.Result",
+ "django_dramatiq.middleware.DbConnectionsMiddleware",
+ "django_dramatiq.middleware.AdminMiddleware",
+ ],
+ }
+
OPENSEARCH_INDEX_THREADS = False
CACHES = {
"default": {
@@ -920,7 +977,20 @@ class Test(Base):
IDENTITY_PROVIDER = None
- CELERY_TASK_ALWAYS_EAGER = values.BooleanValue(True)
+ DRAMATIQ_BROKER = {
+ "BROKER": "dramatiq.brokers.stub.StubBroker",
+ "OPTIONS": {},
+ "MIDDLEWARE": [
+ "dramatiq.middleware.AgeLimit",
+ "dramatiq.middleware.TimeLimit",
+ "dramatiq.middleware.Callbacks",
+ "dramatiq.middleware.Retries",
+ "dramatiq.middleware.CurrentMessage",
+ # "dramatiq.results.middleware.Results",
+ "django_dramatiq.middleware.DbConnectionsMiddleware",
+ "django_dramatiq.middleware.AdminMiddleware",
+ ],
+ }
AWS_S3_DOMAIN_REPLACE = None
diff --git a/src/backend/messages/urls.py b/src/backend/messages/urls.py
index 4b6dc5581..2871ac3fa 100644
--- a/src/backend/messages/urls.py
+++ b/src/backend/messages/urls.py
@@ -2,7 +2,6 @@
from django.conf import settings
from django.conf.urls.static import static
-from django.contrib import admin
from django.contrib.staticfiles.urls import staticfiles_urlpatterns
from django.http import HttpResponse
from django.urls import include, path, re_path
@@ -13,8 +12,10 @@
SpectacularSwaggerView,
)
+from core.admin import admin_site
+
urlpatterns = [
- path(settings.ADMIN_URL, admin.site.urls),
+ path(settings.ADMIN_URL, admin_site.urls),
path("", include("core.urls")),
path(
"healthz/",
diff --git a/src/backend/poetry.lock b/src/backend/poetry.lock
index f53a9c846..916b6ee43 100644
--- a/src/backend/poetry.lock
+++ b/src/backend/poetry.lock
@@ -16,9 +16,10 @@ files = [
name = "amqp"
version = "5.3.1"
description = "Low-level AMQP client for Python (fork of amqplib)."
-optional = false
+optional = true
python-versions = ">=3.6"
groups = ["main"]
+markers = "extra == \"dev\""
files = [
{file = "amqp-5.3.1-py3-none-any.whl", hash = "sha256:43b3319e1b4e7d1251833a93d672b4af1e40f3d632d479b98661a95f117880a2"},
{file = "amqp-5.3.1.tar.gz", hash = "sha256:cddc00c725449522023bad949f70fff7b48f0b1ade74d170a6f10ab044739432"},
@@ -58,6 +59,34 @@ sniffio = ">=1.1"
[package.extras]
trio = ["trio (>=0.26.1)"]
+[[package]]
+name = "apscheduler"
+version = "3.11.0"
+description = "In-process task scheduler with Cron-like capabilities"
+optional = false
+python-versions = ">=3.8"
+groups = ["main"]
+files = [
+ {file = "APScheduler-3.11.0-py3-none-any.whl", hash = "sha256:fc134ca32e50f5eadcc4938e3a4545ab19131435e851abb40b34d63d5141c6da"},
+ {file = "apscheduler-3.11.0.tar.gz", hash = "sha256:4c622d250b0955a65d5d0eb91c33e6d43fd879834bf541e0a18661ae60460133"},
+]
+
+[package.dependencies]
+tzlocal = ">=3.0"
+
+[package.extras]
+doc = ["packaging", "sphinx", "sphinx-rtd-theme (>=1.3.0)"]
+etcd = ["etcd3", "protobuf (<=3.21.0)"]
+gevent = ["gevent"]
+mongodb = ["pymongo (>=3.0)"]
+redis = ["redis (>=3.0)"]
+rethinkdb = ["rethinkdb (>=2.4.0)"]
+sqlalchemy = ["sqlalchemy (>=1.4)"]
+test = ["APScheduler[etcd,mongodb,redis,rethinkdb,sqlalchemy,tornado,zookeeper]", "PySide6 ; platform_python_implementation == \"CPython\" and python_version < \"3.14\"", "anyio (>=4.5.2)", "gevent ; python_version < \"3.14\"", "pytest", "pytz", "twisted ; python_version < \"3.14\""]
+tornado = ["tornado (>=4.3)"]
+twisted = ["twisted"]
+zookeeper = ["kazoo"]
+
[[package]]
name = "asgiref"
version = "3.9.1"
@@ -122,9 +151,10 @@ tests-mypy = ["mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" a
name = "billiard"
version = "4.2.1"
description = "Python multiprocessing fork with improvements and bugfixes"
-optional = false
+optional = true
python-versions = ">=3.7"
groups = ["main"]
+markers = "extra == \"dev\""
files = [
{file = "billiard-4.2.1-py3-none-any.whl", hash = "sha256:40b59a4ac8806ba2c2369ea98d876bc6108b051c227baffd928c644d15d8f3cb"},
{file = "billiard-4.2.1.tar.gz", hash = "sha256:12b641b0c539073fc8d3f5b8b7be998956665c4233c7c1fcd66a7e677c4fb36f"},
@@ -216,9 +246,10 @@ redis = ["redis (>=2.10.5)"]
name = "celery"
version = "5.5.2"
description = "Distributed Task Queue."
-optional = false
+optional = true
python-versions = ">=3.8"
groups = ["main"]
+markers = "extra == \"dev\""
files = [
{file = "celery-5.5.2-py3-none-any.whl", hash = "sha256:54425a067afdc88b57cd8d94ed4af2ffaf13ab8c7680041ac2c4ac44357bdf4c"},
{file = "celery-5.5.2.tar.gz", hash = "sha256:4d6930f354f9d29295425d7a37261245c74a32807c45d764bedc286afd0e724e"},
@@ -232,7 +263,6 @@ click-plugins = ">=1.1.1"
click-repl = ">=0.2.0"
kombu = ">=5.5.2,<5.6"
python-dateutil = ">=2.8.2"
-redis = {version = ">=4.5.2,<4.5.5 || >4.5.5,<6.0.0", optional = true, markers = "extra == \"redis\""}
vine = ">=5.1.0,<6.0"
[package.extras]
@@ -468,9 +498,10 @@ files = [
name = "click"
version = "8.2.1"
description = "Composable command line interface toolkit"
-optional = false
+optional = true
python-versions = ">=3.10"
groups = ["main"]
+markers = "extra == \"dev\""
files = [
{file = "click-8.2.1-py3-none-any.whl", hash = "sha256:61a3265b914e850b85317d0b3109c7f8cd35a670f963866005d6ef1d5175a12b"},
{file = "click-8.2.1.tar.gz", hash = "sha256:27c491cc05d968d271d5a1db13e3b5a184636d9d930f148c50b038f0d0646202"},
@@ -483,9 +514,10 @@ colorama = {version = "*", markers = "platform_system == \"Windows\""}
name = "click-didyoumean"
version = "0.3.1"
description = "Enables git-like *did-you-mean* feature in click"
-optional = false
+optional = true
python-versions = ">=3.6.2"
groups = ["main"]
+markers = "extra == \"dev\""
files = [
{file = "click_didyoumean-0.3.1-py3-none-any.whl", hash = "sha256:5c4bb6007cfea5f2fd6583a2fb6701a22a41eb98957e63d0fac41c10e7c3117c"},
{file = "click_didyoumean-0.3.1.tar.gz", hash = "sha256:4f82fdff0dbe64ef8ab2279bd6aa3f6a99c3b28c05aa09cbfc07c9d7fbb5a463"},
@@ -498,9 +530,10 @@ click = ">=7"
name = "click-plugins"
version = "1.1.1.2"
description = "An extension module for click to enable registering CLI commands via setuptools entry-points."
-optional = false
+optional = true
python-versions = "*"
groups = ["main"]
+markers = "extra == \"dev\""
files = [
{file = "click_plugins-1.1.1.2-py2.py3-none-any.whl", hash = "sha256:008d65743833ffc1f5417bf0e78e8d2c23aab04d9745ba817bd3e71b0feb6aa6"},
{file = "click_plugins-1.1.1.2.tar.gz", hash = "sha256:d7af3984a99d243c131aa1a828331e7630f4a88a9741fd05c927b204bcf92261"},
@@ -516,9 +549,10 @@ dev = ["coveralls", "pytest (>=3.6)", "pytest-cov", "wheel"]
name = "click-repl"
version = "0.3.0"
description = "REPL plugin for Click"
-optional = false
+optional = true
python-versions = ">=3.6"
groups = ["main"]
+markers = "extra == \"dev\""
files = [
{file = "click-repl-0.3.0.tar.gz", hash = "sha256:17849c23dba3d667247dc4defe1757fff98694e90fe37474f3feebb69ced26a9"},
{file = "click_repl-0.3.0-py3-none-any.whl", hash = "sha256:fb7e06deb8da8de86180a33a9da97ac316751c094c6899382da7feeeeb51b812"},
@@ -646,25 +680,6 @@ files = [
[package.extras]
toml = ["tomli ; python_full_version <= \"3.11.0a6\""]
-[[package]]
-name = "cron-descriptor"
-version = "2.0.6"
-description = "A Python library that converts cron expressions into human readable strings."
-optional = false
-python-versions = ">=3.9"
-groups = ["main"]
-files = [
- {file = "cron_descriptor-2.0.6-py3-none-any.whl", hash = "sha256:3a1c0d837c0e5a32e415f821b36cf758eb92d510e6beff8fbfe4fa16573d93d6"},
- {file = "cron_descriptor-2.0.6.tar.gz", hash = "sha256:e39d2848e1d8913cfb6e3452e701b5eec662ee18bea8cc5aa53ee1a7bb217157"},
-]
-
-[package.dependencies]
-typing_extensions = "*"
-
-[package.extras]
-dev = ["mypy", "polib", "ruff"]
-test = ["pytest"]
-
[[package]]
name = "cryptography"
version = "45.0.5"
@@ -843,42 +858,6 @@ tzdata = {version = "*", markers = "sys_platform == \"win32\""}
argon2 = ["argon2-cffi (>=19.1.0)"]
bcrypt = ["bcrypt"]
-[[package]]
-name = "django-celery-beat"
-version = "2.8.0"
-description = "Database-backed Periodic Tasks."
-optional = false
-python-versions = ">=3.8"
-groups = ["main"]
-files = [
- {file = "django_celery_beat-2.8.0-py3-none-any.whl", hash = "sha256:f8fd2e1ffbfa8e570ab9439383b2cd15a6642b347662d0de79c62ba6f68d4b38"},
- {file = "django_celery_beat-2.8.0.tar.gz", hash = "sha256:955bfb3c4b8f1026a8d20144d0da39c941e1eb23acbaee9e12a7e7cc1f74959a"},
-]
-
-[package.dependencies]
-celery = ">=5.2.3,<6.0"
-cron-descriptor = ">=1.2.32"
-Django = ">=2.2,<6.0"
-django-timezone-field = ">=5.0"
-python-crontab = ">=2.3.4"
-tzdata = "*"
-
-[[package]]
-name = "django-celery-results"
-version = "2.6.0"
-description = "Celery result backends for Django."
-optional = false
-python-versions = "*"
-groups = ["main"]
-files = [
- {file = "django_celery_results-2.6.0-py3-none-any.whl", hash = "sha256:b9ccdca2695b98c7cbbb8dea742311ba9a92773d71d7b4944a676e69a7df1c73"},
- {file = "django_celery_results-2.6.0.tar.gz", hash = "sha256:9abcd836ae6b61063779244d8887a88fe80bbfaba143df36d3cb07034671277c"},
-]
-
-[package.dependencies]
-celery = ">=5.2.7,<6.0"
-Django = ">=3.2.25"
-
[[package]]
name = "django-configurations"
version = "2.5.1"
@@ -939,6 +918,25 @@ maintainer = ["django", "zest.releaser[recommended]"]
pyuca = ["pyuca"]
test = ["djangorestframework", "graphene-django", "pytest", "pytest-cov", "pytest-django"]
+[[package]]
+name = "django-dramatiq"
+version = "0.14.0"
+description = "A Django app for Dramatiq."
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+files = [
+ {file = "django_dramatiq-0.14.0-py3-none-any.whl", hash = "sha256:78da4a5cccecf7aa099bf66a6d706269f2d0541056a4e9cdc1e54a043f38f4e1"},
+ {file = "django_dramatiq-0.14.0.tar.gz", hash = "sha256:dd957fa3c3d5106830ce90f10f72a8f07249643c5f35ccfaee1b9a79f7f5d6dd"},
+]
+
+[package.dependencies]
+django = ">=4.2"
+dramatiq = ">=1.11"
+
+[package.extras]
+dev = ["flake8", "flake8-quotes", "isort", "pytest", "pytest-cov", "pytest-django", "twine"]
+
[[package]]
name = "django-extensions"
version = "3.2.3"
@@ -1131,6 +1129,78 @@ idna = ["idna (>=3.7)"]
trio = ["trio (>=0.23)"]
wmi = ["wmi (>=1.5.1)"]
+[[package]]
+name = "dramatiq"
+version = "1.18.0"
+description = "Background Processing for Python 3."
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+files = [
+ {file = "dramatiq-1.18.0-py3-none-any.whl", hash = "sha256:d360f608aa3cd06f5db714bfcd23825dc7098bacfee52aca536b0bb0faae3c69"},
+ {file = "dramatiq-1.18.0.tar.gz", hash = "sha256:5ea436b6e50dae64d4de04f1eb519ad239a6b1ba6315ba1dce1c0c4c1ebedfaf"},
+]
+
+[package.dependencies]
+prometheus-client = ">=0.2"
+redis = {version = ">=2.0,<7.0", optional = true, markers = "extra == \"redis\""}
+
+[package.extras]
+all = ["gevent (>=1.1)", "pika (>=1.0,<2.0)", "pylibmc (>=1.5,<2.0)", "redis (>=2.0,<7.0)", "watchdog (>=4.0)", "watchdog_gevent (>=0.2)"]
+dev = ["alabaster", "bumpversion", "flake8", "flake8-bugbear", "flake8-quotes", "gevent (>=1.1)", "hiredis", "isort", "mypy", "pika (>=1.0,<2.0)", "pylibmc (>=1.5,<2.0)", "pytest", "pytest-benchmark[histogram]", "pytest-cov", "redis (>=2.0,<7.0)", "sphinx", "sphinxcontrib-napoleon", "tox", "twine", "watchdog (>=4.0)", "watchdog_gevent (>=0.2)", "wheel"]
+gevent = ["gevent (>=1.1)"]
+memcached = ["pylibmc (>=1.5,<2.0)"]
+rabbitmq = ["pika (>=1.0,<2.0)"]
+redis = ["redis (>=2.0,<7.0)"]
+watch = ["watchdog (>=4.0)", "watchdog_gevent (>=0.2)"]
+
+[[package]]
+name = "dramatiq-crontab"
+version = "1.0.12"
+description = "Cron style scheduler for asynchronous Dramatiq tasks in Django."
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+files = [
+ {file = "dramatiq_crontab-1.0.12-py3-none-any.whl", hash = "sha256:417d1da76fd423c02466db6e1091bb089cf76596331951447283fb9d6720ab8c"},
+ {file = "dramatiq_crontab-1.0.12.tar.gz", hash = "sha256:6192289cb7fe16aa698cd6a78aa86e40ae4fb1fe043a55a028cfbf3c2299c8dd"},
+]
+
+[package.dependencies]
+apscheduler = "*"
+django = "*"
+dramatiq = "*"
+sentry-sdk = {version = "*", optional = true, markers = "extra == \"sentry\""}
+
+[package.extras]
+lint = ["black (==25.1.0)", "ruff (==0.12.7)"]
+redis = ["redis"]
+sentry = ["sentry-sdk"]
+test = ["backports.zoneinfo ; python_version < \"3.9\"", "dramatiq", "pytest", "pytest-cov", "pytest-django"]
+
+[[package]]
+name = "dramatiq_dashboard"
+version = "0.4.0"
+description = "A dashboard for Dramatiq (Redis-only!)."
+optional = false
+python-versions = ">=3.6"
+groups = ["main"]
+files = []
+develop = false
+
+[package.dependencies]
+dramatiq = {version = ">=1.6,<2.0", extras = ["redis"]}
+jinja2 = ">=2"
+
+[package.extras]
+dev = ["alabaster", "bumpversion", "flake8", "flake8-bugbear", "flake8-quotes", "hiredis", "isort", "pytest", "pytest-benchmark[histogram]", "pytest-cov", "sphinx (<1.8)", "sphinxcontrib-napoleon", "tox", "twine", "wheel"]
+
+[package.source]
+type = "git"
+url = "https://github.com/dbowring/dramatiq_dashboard"
+reference = "128ee38e2b5efc36c4ae964a934cffc41baaf896"
+resolved_reference = "128ee38e2b5efc36c4ae964a934cffc41baaf896"
+
[[package]]
name = "drf-spectacular"
version = "0.28.0"
@@ -1464,6 +1534,24 @@ files = [
colors = ["colorama"]
plugins = ["setuptools"]
+[[package]]
+name = "jinja2"
+version = "3.1.6"
+description = "A very fast and expressive template engine."
+optional = false
+python-versions = ">=3.7"
+groups = ["main"]
+files = [
+ {file = "jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67"},
+ {file = "jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d"},
+]
+
+[package.dependencies]
+MarkupSafe = ">=2.0"
+
+[package.extras]
+i18n = ["Babel (>=2.7)"]
+
[[package]]
name = "jiter"
version = "0.10.0"
@@ -1638,9 +1726,10 @@ typing-extensions = ">=4.5.0"
name = "kombu"
version = "5.5.4"
description = "Messaging library for Python."
-optional = false
+optional = true
python-versions = ">=3.8"
groups = ["main"]
+markers = "extra == \"dev\""
files = [
{file = "kombu-5.5.4-py3-none-any.whl", hash = "sha256:a12ed0557c238897d8e518f1d1fdf84bd1516c5e305af2dacd85c2015115feb8"},
{file = "kombu-5.5.4.tar.gz", hash = "sha256:886600168275ebeada93b888e831352fe578168342f0d1d5833d88ba0d847363"},
@@ -1726,6 +1815,105 @@ profiling = ["gprof2dot"]
rtd = ["ipykernel", "jupyter_sphinx", "mdit-py-plugins (>=0.5.0)", "myst-parser", "pyyaml", "sphinx", "sphinx-book-theme (>=1.0,<2.0)", "sphinx-copybutton", "sphinx-design"]
testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions", "requests"]
+[[package]]
+name = "markupsafe"
+version = "3.0.3"
+description = "Safely add untrusted strings to HTML/XML markup."
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+files = [
+ {file = "markupsafe-3.0.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:2f981d352f04553a7171b8e44369f2af4055f888dfb147d55e42d29e29e74559"},
+ {file = "markupsafe-3.0.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e1c1493fb6e50ab01d20a22826e57520f1284df32f2d8601fdd90b6304601419"},
+ {file = "markupsafe-3.0.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1ba88449deb3de88bd40044603fafffb7bc2b055d626a330323a9ed736661695"},
+ {file = "markupsafe-3.0.3-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f42d0984e947b8adf7dd6dde396e720934d12c506ce84eea8476409563607591"},
+ {file = "markupsafe-3.0.3-cp310-cp310-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:c0c0b3ade1c0b13b936d7970b1d37a57acde9199dc2aecc4c336773e1d86049c"},
+ {file = "markupsafe-3.0.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:0303439a41979d9e74d18ff5e2dd8c43ed6c6001fd40e5bf2e43f7bd9bbc523f"},
+ {file = "markupsafe-3.0.3-cp310-cp310-musllinux_1_2_riscv64.whl", hash = "sha256:d2ee202e79d8ed691ceebae8e0486bd9a2cd4794cec4824e1c99b6f5009502f6"},
+ {file = "markupsafe-3.0.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:177b5253b2834fe3678cb4a5f0059808258584c559193998be2601324fdeafb1"},
+ {file = "markupsafe-3.0.3-cp310-cp310-win32.whl", hash = "sha256:2a15a08b17dd94c53a1da0438822d70ebcd13f8c3a95abe3a9ef9f11a94830aa"},
+ {file = "markupsafe-3.0.3-cp310-cp310-win_amd64.whl", hash = "sha256:c4ffb7ebf07cfe8931028e3e4c85f0357459a3f9f9490886198848f4fa002ec8"},
+ {file = "markupsafe-3.0.3-cp310-cp310-win_arm64.whl", hash = "sha256:e2103a929dfa2fcaf9bb4e7c091983a49c9ac3b19c9061b6d5427dd7d14d81a1"},
+ {file = "markupsafe-3.0.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:1cc7ea17a6824959616c525620e387f6dd30fec8cb44f649e31712db02123dad"},
+ {file = "markupsafe-3.0.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:4bd4cd07944443f5a265608cc6aab442e4f74dff8088b0dfc8238647b8f6ae9a"},
+ {file = "markupsafe-3.0.3-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6b5420a1d9450023228968e7e6a9ce57f65d148ab56d2313fcd589eee96a7a50"},
+ {file = "markupsafe-3.0.3-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0bf2a864d67e76e5c9a34dc26ec616a66b9888e25e7b9460e1c76d3293bd9dbf"},
+ {file = "markupsafe-3.0.3-cp311-cp311-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:bc51efed119bc9cfdf792cdeaa4d67e8f6fcccab66ed4bfdd6bde3e59bfcbb2f"},
+ {file = "markupsafe-3.0.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:068f375c472b3e7acbe2d5318dea141359e6900156b5b2ba06a30b169086b91a"},
+ {file = "markupsafe-3.0.3-cp311-cp311-musllinux_1_2_riscv64.whl", hash = "sha256:7be7b61bb172e1ed687f1754f8e7484f1c8019780f6f6b0786e76bb01c2ae115"},
+ {file = "markupsafe-3.0.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:f9e130248f4462aaa8e2552d547f36ddadbeaa573879158d721bbd33dfe4743a"},
+ {file = "markupsafe-3.0.3-cp311-cp311-win32.whl", hash = "sha256:0db14f5dafddbb6d9208827849fad01f1a2609380add406671a26386cdf15a19"},
+ {file = "markupsafe-3.0.3-cp311-cp311-win_amd64.whl", hash = "sha256:de8a88e63464af587c950061a5e6a67d3632e36df62b986892331d4620a35c01"},
+ {file = "markupsafe-3.0.3-cp311-cp311-win_arm64.whl", hash = "sha256:3b562dd9e9ea93f13d53989d23a7e775fdfd1066c33494ff43f5418bc8c58a5c"},
+ {file = "markupsafe-3.0.3-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:d53197da72cc091b024dd97249dfc7794d6a56530370992a5e1a08983ad9230e"},
+ {file = "markupsafe-3.0.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1872df69a4de6aead3491198eaf13810b565bdbeec3ae2dc8780f14458ec73ce"},
+ {file = "markupsafe-3.0.3-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3a7e8ae81ae39e62a41ec302f972ba6ae23a5c5396c8e60113e9066ef893da0d"},
+ {file = "markupsafe-3.0.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:d6dd0be5b5b189d31db7cda48b91d7e0a9795f31430b7f271219ab30f1d3ac9d"},
+ {file = "markupsafe-3.0.3-cp312-cp312-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:94c6f0bb423f739146aec64595853541634bde58b2135f27f61c1ffd1cd4d16a"},
+ {file = "markupsafe-3.0.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:be8813b57049a7dc738189df53d69395eba14fb99345e0a5994914a3864c8a4b"},
+ {file = "markupsafe-3.0.3-cp312-cp312-musllinux_1_2_riscv64.whl", hash = "sha256:83891d0e9fb81a825d9a6d61e3f07550ca70a076484292a70fde82c4b807286f"},
+ {file = "markupsafe-3.0.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:77f0643abe7495da77fb436f50f8dab76dbc6e5fd25d39589a0f1fe6548bfa2b"},
+ {file = "markupsafe-3.0.3-cp312-cp312-win32.whl", hash = "sha256:d88b440e37a16e651bda4c7c2b930eb586fd15ca7406cb39e211fcff3bf3017d"},
+ {file = "markupsafe-3.0.3-cp312-cp312-win_amd64.whl", hash = "sha256:26a5784ded40c9e318cfc2bdb30fe164bdb8665ded9cd64d500a34fb42067b1c"},
+ {file = "markupsafe-3.0.3-cp312-cp312-win_arm64.whl", hash = "sha256:35add3b638a5d900e807944a078b51922212fb3dedb01633a8defc4b01a3c85f"},
+ {file = "markupsafe-3.0.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:e1cf1972137e83c5d4c136c43ced9ac51d0e124706ee1c8aa8532c1287fa8795"},
+ {file = "markupsafe-3.0.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:116bb52f642a37c115f517494ea5feb03889e04df47eeff5b130b1808ce7c219"},
+ {file = "markupsafe-3.0.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:133a43e73a802c5562be9bbcd03d090aa5a1fe899db609c29e8c8d815c5f6de6"},
+ {file = "markupsafe-3.0.3-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ccfcd093f13f0f0b7fdd0f198b90053bf7b2f02a3927a30e63f3ccc9df56b676"},
+ {file = "markupsafe-3.0.3-cp313-cp313-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:509fa21c6deb7a7a273d629cf5ec029bc209d1a51178615ddf718f5918992ab9"},
+ {file = "markupsafe-3.0.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:a4afe79fb3de0b7097d81da19090f4df4f8d3a2b3adaa8764138aac2e44f3af1"},
+ {file = "markupsafe-3.0.3-cp313-cp313-musllinux_1_2_riscv64.whl", hash = "sha256:795e7751525cae078558e679d646ae45574b47ed6e7771863fcc079a6171a0fc"},
+ {file = "markupsafe-3.0.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:8485f406a96febb5140bfeca44a73e3ce5116b2501ac54fe953e488fb1d03b12"},
+ {file = "markupsafe-3.0.3-cp313-cp313-win32.whl", hash = "sha256:bdd37121970bfd8be76c5fb069c7751683bdf373db1ed6c010162b2a130248ed"},
+ {file = "markupsafe-3.0.3-cp313-cp313-win_amd64.whl", hash = "sha256:9a1abfdc021a164803f4d485104931fb8f8c1efd55bc6b748d2f5774e78b62c5"},
+ {file = "markupsafe-3.0.3-cp313-cp313-win_arm64.whl", hash = "sha256:7e68f88e5b8799aa49c85cd116c932a1ac15caaa3f5db09087854d218359e485"},
+ {file = "markupsafe-3.0.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:218551f6df4868a8d527e3062d0fb968682fe92054e89978594c28e642c43a73"},
+ {file = "markupsafe-3.0.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:3524b778fe5cfb3452a09d31e7b5adefeea8c5be1d43c4f810ba09f2ceb29d37"},
+ {file = "markupsafe-3.0.3-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:4e885a3d1efa2eadc93c894a21770e4bc67899e3543680313b09f139e149ab19"},
+ {file = "markupsafe-3.0.3-cp313-cp313t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8709b08f4a89aa7586de0aadc8da56180242ee0ada3999749b183aa23df95025"},
+ {file = "markupsafe-3.0.3-cp313-cp313t-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:b8512a91625c9b3da6f127803b166b629725e68af71f8184ae7e7d54686a56d6"},
+ {file = "markupsafe-3.0.3-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:9b79b7a16f7fedff2495d684f2b59b0457c3b493778c9eed31111be64d58279f"},
+ {file = "markupsafe-3.0.3-cp313-cp313t-musllinux_1_2_riscv64.whl", hash = "sha256:12c63dfb4a98206f045aa9563db46507995f7ef6d83b2f68eda65c307c6829eb"},
+ {file = "markupsafe-3.0.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:8f71bc33915be5186016f675cd83a1e08523649b0e33efdb898db577ef5bb009"},
+ {file = "markupsafe-3.0.3-cp313-cp313t-win32.whl", hash = "sha256:69c0b73548bc525c8cb9a251cddf1931d1db4d2258e9599c28c07ef3580ef354"},
+ {file = "markupsafe-3.0.3-cp313-cp313t-win_amd64.whl", hash = "sha256:1b4b79e8ebf6b55351f0d91fe80f893b4743f104bff22e90697db1590e47a218"},
+ {file = "markupsafe-3.0.3-cp313-cp313t-win_arm64.whl", hash = "sha256:ad2cf8aa28b8c020ab2fc8287b0f823d0a7d8630784c31e9ee5edea20f406287"},
+ {file = "markupsafe-3.0.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:eaa9599de571d72e2daf60164784109f19978b327a3910d3e9de8c97b5b70cfe"},
+ {file = "markupsafe-3.0.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:c47a551199eb8eb2121d4f0f15ae0f923d31350ab9280078d1e5f12b249e0026"},
+ {file = "markupsafe-3.0.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f34c41761022dd093b4b6896d4810782ffbabe30f2d443ff5f083e0cbbb8c737"},
+ {file = "markupsafe-3.0.3-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:457a69a9577064c05a97c41f4e65148652db078a3a509039e64d3467b9e7ef97"},
+ {file = "markupsafe-3.0.3-cp314-cp314-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:e8afc3f2ccfa24215f8cb28dcf43f0113ac3c37c2f0f0806d8c70e4228c5cf4d"},
+ {file = "markupsafe-3.0.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:ec15a59cf5af7be74194f7ab02d0f59a62bdcf1a537677ce67a2537c9b87fcda"},
+ {file = "markupsafe-3.0.3-cp314-cp314-musllinux_1_2_riscv64.whl", hash = "sha256:0eb9ff8191e8498cca014656ae6b8d61f39da5f95b488805da4bb029cccbfbaf"},
+ {file = "markupsafe-3.0.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:2713baf880df847f2bece4230d4d094280f4e67b1e813eec43b4c0e144a34ffe"},
+ {file = "markupsafe-3.0.3-cp314-cp314-win32.whl", hash = "sha256:729586769a26dbceff69f7a7dbbf59ab6572b99d94576a5592625d5b411576b9"},
+ {file = "markupsafe-3.0.3-cp314-cp314-win_amd64.whl", hash = "sha256:bdc919ead48f234740ad807933cdf545180bfbe9342c2bb451556db2ed958581"},
+ {file = "markupsafe-3.0.3-cp314-cp314-win_arm64.whl", hash = "sha256:5a7d5dc5140555cf21a6fefbdbf8723f06fcd2f63ef108f2854de715e4422cb4"},
+ {file = "markupsafe-3.0.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:1353ef0c1b138e1907ae78e2f6c63ff67501122006b0f9abad68fda5f4ffc6ab"},
+ {file = "markupsafe-3.0.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:1085e7fbddd3be5f89cc898938f42c0b3c711fdcb37d75221de2666af647c175"},
+ {file = "markupsafe-3.0.3-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1b52b4fb9df4eb9ae465f8d0c228a00624de2334f216f178a995ccdcf82c4634"},
+ {file = "markupsafe-3.0.3-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:fed51ac40f757d41b7c48425901843666a6677e3e8eb0abcff09e4ba6e664f50"},
+ {file = "markupsafe-3.0.3-cp314-cp314t-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:f190daf01f13c72eac4efd5c430a8de82489d9cff23c364c3ea822545032993e"},
+ {file = "markupsafe-3.0.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:e56b7d45a839a697b5eb268c82a71bd8c7f6c94d6fd50c3d577fa39a9f1409f5"},
+ {file = "markupsafe-3.0.3-cp314-cp314t-musllinux_1_2_riscv64.whl", hash = "sha256:f3e98bb3798ead92273dc0e5fd0f31ade220f59a266ffd8a4f6065e0a3ce0523"},
+ {file = "markupsafe-3.0.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:5678211cb9333a6468fb8d8be0305520aa073f50d17f089b5b4b477ea6e67fdc"},
+ {file = "markupsafe-3.0.3-cp314-cp314t-win32.whl", hash = "sha256:915c04ba3851909ce68ccc2b8e2cd691618c4dc4c4232fb7982bca3f41fd8c3d"},
+ {file = "markupsafe-3.0.3-cp314-cp314t-win_amd64.whl", hash = "sha256:4faffd047e07c38848ce017e8725090413cd80cbc23d86e55c587bf979e579c9"},
+ {file = "markupsafe-3.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:32001d6a8fc98c8cb5c947787c5d08b0a50663d139f1305bac5885d98d9b40fa"},
+ {file = "markupsafe-3.0.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:15d939a21d546304880945ca1ecb8a039db6b4dc49b2c5a400387cdae6a62e26"},
+ {file = "markupsafe-3.0.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f71a396b3bf33ecaa1626c255855702aca4d3d9fea5e051b41ac59a9c1c41edc"},
+ {file = "markupsafe-3.0.3-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0f4b68347f8c5eab4a13419215bdfd7f8c9b19f2b25520968adfad23eb0ce60c"},
+ {file = "markupsafe-3.0.3-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e8fc20152abba6b83724d7ff268c249fa196d8259ff481f3b1476383f8f24e42"},
+ {file = "markupsafe-3.0.3-cp39-cp39-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:949b8d66bc381ee8b007cd945914c721d9aba8e27f71959d750a46f7c282b20b"},
+ {file = "markupsafe-3.0.3-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:3537e01efc9d4dccdf77221fb1cb3b8e1a38d5428920e0657ce299b20324d758"},
+ {file = "markupsafe-3.0.3-cp39-cp39-musllinux_1_2_riscv64.whl", hash = "sha256:591ae9f2a647529ca990bc681daebdd52c8791ff06c2bfa05b65163e28102ef2"},
+ {file = "markupsafe-3.0.3-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:a320721ab5a1aba0a233739394eb907f8c8da5c98c9181d1161e77a0c8e36f2d"},
+ {file = "markupsafe-3.0.3-cp39-cp39-win32.whl", hash = "sha256:df2449253ef108a379b8b5d6b43f4b1a8e81a061d6537becd5582fba5f9196d7"},
+ {file = "markupsafe-3.0.3-cp39-cp39-win_amd64.whl", hash = "sha256:7c3fb7d25180895632e5d3148dbdc29ea38ccb7fd210aa27acbd1201a1902c6e"},
+ {file = "markupsafe-3.0.3-cp39-cp39-win_arm64.whl", hash = "sha256:38664109c14ffc9e7437e86b4dceb442b0096dfe3541d7864d9cbe1da4cf36c8"},
+ {file = "markupsafe-3.0.3.tar.gz", hash = "sha256:722695808f4b6457b320fdc131280796bdceb04ab50fe1795cd540799ebe1698"},
+]
+
[[package]]
name = "mccabe"
version = "0.7.0"
@@ -2115,9 +2303,10 @@ twisted = ["twisted"]
name = "prompt-toolkit"
version = "3.0.52"
description = "Library for building powerful interactive command lines in Python"
-optional = false
+optional = true
python-versions = ">=3.8"
groups = ["main"]
+markers = "extra == \"dev\""
files = [
{file = "prompt_toolkit-3.0.52-py3-none-any.whl", hash = "sha256:9aac639a3bbd33284347de5ad8d68ecc044b91a762dc39b7c21095fcd6a19955"},
{file = "prompt_toolkit-3.0.52.tar.gz", hash = "sha256:28cde192929c8e7321de85de1ddbe736f1375148b02f2e17edd840042b1be855"},
@@ -2644,22 +2833,6 @@ psutil = ["psutil (>=3.0)"]
setproctitle = ["setproctitle"]
testing = ["filelock"]
-[[package]]
-name = "python-crontab"
-version = "3.3.0"
-description = "Python Crontab API"
-optional = false
-python-versions = "*"
-groups = ["main"]
-files = [
- {file = "python_crontab-3.3.0-py3-none-any.whl", hash = "sha256:739a778b1a771379b75654e53fd4df58e5c63a9279a63b5dfe44c0fcc3ee7884"},
- {file = "python_crontab-3.3.0.tar.gz", hash = "sha256:007c8aee68dddf3e04ec4dce0fac124b93bd68be7470fc95d2a9617a15de291b"},
-]
-
-[package.extras]
-cron-description = ["cron-descriptor"]
-cron-schedule = ["croniter"]
-
[[package]]
name = "python-dateutil"
version = "2.9.0.post0"
@@ -3540,6 +3713,24 @@ files = [
{file = "tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9"},
]
+[[package]]
+name = "tzlocal"
+version = "5.3.1"
+description = "tzinfo object for the local timezone"
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+files = [
+ {file = "tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d"},
+ {file = "tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd"},
+]
+
+[package.dependencies]
+tzdata = {version = "*", markers = "platform_system == \"Windows\""}
+
+[package.extras]
+devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3)", "zest.releaser"]
+
[[package]]
name = "uritemplate"
version = "4.2.0"
@@ -3589,9 +3780,10 @@ zstd = ["zstandard (>=0.18.0)"]
name = "vine"
version = "5.1.0"
description = "Python promises."
-optional = false
+optional = true
python-versions = ">=3.6"
groups = ["main"]
+markers = "extra == \"dev\""
files = [
{file = "vine-5.1.0-py3-none-any.whl", hash = "sha256:40fdf3c48b2cfe1c38a49e9ae2da6fda88e4794c810050a728bd7413811fb1dc"},
{file = "vine-5.1.0.tar.gz", hash = "sha256:8b62e981d35c41049211cf62a0a1242d8c1ee9bd15bb196ce38aefd6799e61e0"},
@@ -3601,9 +3793,10 @@ files = [
name = "wcwidth"
version = "0.2.13"
description = "Measures the displayed width of unicode strings in a terminal"
-optional = false
+optional = true
python-versions = "*"
groups = ["main"]
+markers = "extra == \"dev\""
files = [
{file = "wcwidth-0.2.13-py2.py3-none-any.whl", hash = "sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859"},
{file = "wcwidth-0.2.13.tar.gz", hash = "sha256:72ea0c06399eb286d978fdedb6923a9eb47e1c486ce63e9b4e64fc18303972b5"},
@@ -3649,4 +3842,4 @@ dev = ["django-extensions", "drf-spectacular-sidecar", "flower", "pip-audit", "p
[metadata]
lock-version = "2.1"
python-versions = ">=3.13,<4.0"
-content-hash = "b02eb7943a2f100953cbb14c779c0d3ac8c65e26bbbc4c5bf8e48f401680ed7e"
+content-hash = "dd6f59bb1028c7d2f4122007674da97242251b060608296f1b2712d7a41f9f44"
diff --git a/src/backend/pyproject.toml b/src/backend/pyproject.toml
index e14893d61..fbf9068a9 100644
--- a/src/backend/pyproject.toml
+++ b/src/backend/pyproject.toml
@@ -5,7 +5,7 @@
[project]
name = "messages-backend"
version = "0.0.1"
-authors = [{ "name" = "ANCT", "email" = "suiteterritoriale@anct.gouv.fr" }]
+authors = [{ "name" = "ANCT", "email" = "contact@suite.anct.gouv.fr" }]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Framework :: Django",
@@ -26,12 +26,13 @@ requires-python = ">=3.13,<4.0"
dependencies = [
"boto3==1.40.43",
"botocore==1.40.43",
- "celery[redis]==5.5.2",
"cryptography==45.0.5",
"dj-database-url==2.3.0",
"django==5.1.13",
- "django-celery-beat==2.8.0",
- "django-celery-results==2.6.0",
+ "django-dramatiq==0.14.0",
+ "dramatiq[redis]==1.18.0",
+ "dramatiq-dashboard@git+https://github.com/dbowring/dramatiq_dashboard@128ee38e2b5efc36c4ae964a934cffc41baaf896",
+ "dramatiq-crontab[sentry]==1.0.12",
"django-configurations==2.5.1",
"django-cors-headers==4.6.0",
"django-countries==7.6.1",