diff --git a/Makefile b/Makefile index aeb972498..8753203ee 100644 --- a/Makefile +++ b/Makefile @@ -324,8 +324,9 @@ back-poetry-tree: ## show dependencies as a tree @$(COMPOSE) run --rm --build backend-dev pipdeptree .PHONY: back-poetry-tree -pip-audit: ## check the dependencies - @$(COMPOSE) run --rm --no-deps -e HOME=/tmp --build backend-dev pip-audit +pip-audit: ## check the dependencies + # https://github.com/pypa/pip/issues/13607 + @$(COMPOSE) run --rm --no-deps -e HOME=/tmp --build backend-dev pip-audit --ignore-vuln GHSA-4xh5-x5gv-qwph .PHONY: pip-audit collectstatic: ## collect static files diff --git a/Procfile b/Procfile index 13c5fdbb6..b40827f0f 100644 --- a/Procfile +++ b/Procfile @@ -1,3 +1,4 @@ web: bin/scalingo_run_web -worker: celery -A messages.celery_app worker --task-events --beat -l INFO -c $CELERY_CONCURRENCY -Q celery,default +worker: python manage.py worker +scheduler: python manage.py crontab postdeploy: python manage.py migrate \ No newline at end of file diff --git a/compose.yaml b/compose.yaml index df59e2bd4..1103f16f6 100644 --- a/compose.yaml +++ b/compose.yaml @@ -159,14 +159,14 @@ services: target: poetry pull_policy: build - celery-dev: + worker-dev: build: context: src/backend target: runtime-dev args: DOCKER_USER: ${DOCKER_USER:-1000} user: ${DOCKER_USER:-1000} - command: ["celery", "-A", "messages.celery_app", "worker", "-l", "DEBUG", "-Q", "celery,default"] + command: ["python", "manage.py", "worker", "-p", "2", "-t", "2"] environment: - DJANGO_CONFIGURATION=Development env_file: @@ -177,39 +177,27 @@ services: - ./data/static:/data/static depends_on: - backend-dev + - redis - celery-ui: + scheduler-dev: build: context: src/backend target: runtime-dev args: DOCKER_USER: ${DOCKER_USER:-1000} user: ${DOCKER_USER:-1000} - depends_on: - - redis + command: ["python", "manage.py", "crontab"] environment: - - FLOWER_UNAUTHENTICATED_API=true - DJANGO_CONFIGURATION=Development env_file: - env.d/development/backend.defaults - env.d/development/backend.local volumes: - ./src/backend:/app - ports: - - "8903:8803" - command: celery -A messages.celery_app flower --port=8803 - - # nginx: - # image: nginx:1.25 - # ports: - # - "8083:8083" - # volumes: - # - ./docker/files/development/etc/nginx/conf.d:/etc/nginx/conf.d:ro - # depends_on: - # - keycloak - # - backend-dev - # - mta-in - # - mta-out + - ./data/static:/data/static + depends_on: + - backend-dev + - redis frontend-dev: user: "${DOCKER_USER:-1000}" diff --git a/docs/env.md b/docs/env.md index 9a7d9c7fb..c4332a4e5 100644 --- a/docs/env.md +++ b/docs/env.md @@ -60,7 +60,7 @@ The application uses a new environment file structure with `.defaults` and `.loc | Variable | Default | Description | Required | |----------|---------|-------------|----------| | `REDIS_URL` | `redis://redis:6379` | Redis connection URL (internal) | Optional | -| `CELERY_BROKER_URL` | `redis://redis:6379` | Celery message broker URL (internal) | Optional | +| `DRAMATIQ_BROKER_URL` | `redis://redis:6379` | Dramatiq message broker URL (internal) | Optional | | `CACHES_DEFAULT_TIMEOUT` | `30` | Default cache timeout in seconds | Optional | **Note**: For external Redis access, use `localhost:8913`. For internal container communication, use `redis:6379`. diff --git a/env.d/development/backend.defaults b/env.d/development/backend.defaults index 7d52973c2..404f63895 100644 --- a/env.d/development/backend.defaults +++ b/env.d/development/backend.defaults @@ -27,6 +27,10 @@ METRICS_API_KEY=ExampleMetricsApiKey # Python PYTHONPATH=/app +# Dramatiq +DRAMATIQ_BROKER_URL=redis://redis:6379 +DRAMATIQ_CRONTAB_REDIS_URL=redis://redis:6379/0 + # Messages settings # Mail diff --git a/src/backend/Dockerfile b/src/backend/Dockerfile index b4e66dbc4..ac62a4a6c 100644 --- a/src/backend/Dockerfile +++ b/src/backend/Dockerfile @@ -1,8 +1,8 @@ # https://hub.docker.com/_/python -FROM python:3.13.7-slim-trixie AS base +FROM python:3.13.9-slim-trixie AS base # Bump this to force an update of the apt repositories -ENV MIN_UPDATE_DATE="2025-09-02" +ENV MIN_UPDATE_DATE="2025-10-20" RUN apt-get update && apt-get upgrade -y \ && rm -rf /var/lib/apt/lists/* @@ -22,7 +22,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ ENV POETRY_NO_INTERACTION=1 ENV POETRY_VIRTUALENVS_CREATE=0 ENV POETRY_VIRTUALENVS_OPTIONS_NO_PIP=1 -ENV POETRY_VERSION=2.1.4 +ENV POETRY_VERSION=2.2.1 RUN python -m pip install poetry==${POETRY_VERSION} diff --git a/src/backend/core/admin.py b/src/backend/core/admin.py index d8bb18ab7..694190764 100644 --- a/src/backend/core/admin.py +++ b/src/backend/core/admin.py @@ -1,15 +1,20 @@ """Admin classes and registrations for core app.""" from django.contrib import admin, messages +from django.contrib.admin.views.decorators import staff_member_required from django.contrib.auth import admin as auth_admin from django.core.files.storage import storages +from django.http import HttpResponse from django.shortcuts import redirect from django.template.response import TemplateResponse -from django.urls import path +from django.urls import path, reverse from django.utils.html import escape, format_html from django.utils.text import slugify from django.utils.translation import gettext_lazy as _ +import dramatiq +import dramatiq_dashboard + from core.api.utils import get_file_key from core.services.importer import ImportService @@ -694,3 +699,75 @@ def get_html_body(self, obj): return obj.html_body get_html_body.short_description = "HTML Body" + + +# Dramatiq Dashboard Integration +@staff_member_required +def dramatiq_dashboard_view(request): + """Serve the Dramatiq dashboard for staff users only.""" + # Get the broker from django-dramatiq + broker = dramatiq.get_broker() + + # Create the dashboard app + dashboard_app = dramatiq_dashboard.DashboardApp(broker=broker, prefix="") + + # Create a WSGI environment + environ = { + "REQUEST_METHOD": request.method, + "PATH_INFO": request.path_info, + "QUERY_STRING": request.META.get("QUERY_STRING", ""), + "CONTENT_TYPE": request.META.get("CONTENT_TYPE", ""), + "CONTENT_LENGTH": request.META.get("CONTENT_LENGTH", ""), + "HTTP_HOST": request.META.get("HTTP_HOST", ""), + "SERVER_NAME": request.META.get("SERVER_NAME", ""), + "SERVER_PORT": request.META.get("SERVER_PORT", ""), + "wsgi.url_scheme": request.scheme, + "wsgi.input": request, + "wsgi.errors": request, + "wsgi.version": (1, 0), + "wsgi.multithread": True, + "wsgi.multiprocess": False, + "wsgi.run_once": False, + } + + # Add HTTP headers + for key, value in request.META.items(): + if key.startswith("HTTP_"): + environ[key] = value + + # Call the dashboard app + def start_response(status, response_headers): # pylint: disable=unused-argument + # This will be called by the WSGI app + pass + + # Get the response from the dashboard + response_body = dashboard_app(environ, start_response) + + # Create Django response + response = HttpResponse(b"".join(response_body)) + response.status_code = 200 + response["Content-Type"] = "text/html; charset=utf-8" + + return response + + +class CoreAdminSite(admin.AdminSite): + """Custom admin site with Dramatiq dashboard integration.""" + + def get_urls(self): + """Add Dramatiq dashboard URL to admin URLs.""" + urls = super().get_urls() + custom_urls = [ + path("dramatiq/", dramatiq_dashboard_view, name="dramatiq_dashboard"), + ] + return custom_urls + urls + + def index(self, request, extra_context=None): + """Add Dramatiq dashboard link to admin index.""" + extra_context = extra_context or {} + extra_context["dramatiq_dashboard_url"] = reverse("admin:dramatiq_dashboard") + return super().index(request, extra_context) + + +# Create custom admin site instance +admin_site = CoreAdminSite(name="admin") diff --git a/src/backend/core/api/viewsets/import_message.py b/src/backend/core/api/viewsets/import_message.py index 7f127e099..67cd9a0ac 100644 --- a/src/backend/core/api/viewsets/import_message.py +++ b/src/backend/core/api/viewsets/import_message.py @@ -43,7 +43,7 @@ class ImportViewSet(viewsets.ViewSet): request=ImportFileSerializer, responses={ 202: OpenApiResponse( - description="Import started. Returns Celery task ID for tracking.", + description="Import started. Returns task ID for tracking.", response={ "type": "object", "properties": { @@ -96,7 +96,7 @@ def import_file(self, request): request=ImportIMAPSerializer, responses={ 202: OpenApiResponse( - description="IMAP import started. Returns Celery task ID for tracking the import progress.", + description="IMAP import started. Returns task ID for tracking the import progress.", response={ "type": "object", "properties": { diff --git a/src/backend/core/api/viewsets/send.py b/src/backend/core/api/viewsets/send.py index 55e98e645..d9b0ed4d9 100644 --- a/src/backend/core/api/viewsets/send.py +++ b/src/backend/core/api/viewsets/send.py @@ -120,7 +120,7 @@ def post(self, request): ) # Launch async task for sending the message - task = send_message_task.delay(str(message.id), must_archive=must_archive) + task = send_message_task.send(str(message.id), must_archive=must_archive) # --- Finalize --- # Message state should be updated by prepare_outbound_message/send_message @@ -130,4 +130,4 @@ def post(self, request): # Update thread stats after un-drafting message.thread.update_stats() - return Response({"task_id": task.id}, status=status.HTTP_200_OK) + return Response({"task_id": task.message_id}, status=status.HTTP_200_OK) diff --git a/src/backend/core/api/viewsets/task.py b/src/backend/core/api/viewsets/task.py index e55e4fca1..3af4c6f8f 100644 --- a/src/backend/core/api/viewsets/task.py +++ b/src/backend/core/api/viewsets/task.py @@ -1,9 +1,9 @@ -"""API ViewSet for Celery task status.""" +"""API ViewSet for asynchronous task statuses.""" import logging -from celery import states as celery_states -from celery.result import AsyncResult +import dramatiq +from dramatiq.results import ResultFailure, ResultMissing, ResultTimeout from drf_spectacular.utils import ( OpenApiExample, extend_schema, @@ -14,10 +14,13 @@ from rest_framework.response import Response from rest_framework.views import APIView -from messages.celery_app import app as celery_app +from core.utils import get_task_progress logger = logging.getLogger(__name__) +# Map Dramatiq states to Celery-like states for frontend compatibility +DRAMATIQ_STATES = ["PENDING", "SUCCESS", "FAILURE", "RETRY", "REJECTED", "PROGRESS"] + @extend_schema( tags=["tasks"], @@ -34,9 +37,7 @@ 200: inline_serializer( name="TaskStatusResponse", fields={ - "status": drf_serializers.ChoiceField( - choices=sorted({*celery_states.ALL_STATES, "PROGRESS"}) - ), + "status": drf_serializers.ChoiceField(choices=sorted(DRAMATIQ_STATES)), "result": drf_serializers.JSONField(allow_null=True), "error": drf_serializers.CharField(allow_null=True), }, @@ -59,38 +60,65 @@ ], ) class TaskDetailView(APIView): - """View to retrieve the status of a Celery task.""" + """View to retrieve the status of a task.""" permission_classes = [permissions.IsAuthenticated] def get(self, request, task_id): - """Get the status of a Celery task.""" - - task_result = AsyncResult(task_id, app=celery_app) + """Get the status of a task.""" + # Try to fetch a result from Dramatiq's result backend + broker = dramatiq.get_broker() + result_backend = broker.get_results_backend() - # By default unknown tasks will be in PENDING. There is no reliable - # way to check if a task exists or not with Celery. - # https://github.com/celery/celery/issues/3596#issuecomment-262102185 + if result_backend is not None: + try: + # Retrieve a Message for this task id from the backend, then get result + # See Dramatiq results API: message.get_result(...) + message = result_backend.get_message(task_id) + result = message.get_result(backend=result_backend, block=False) + return Response( + { + "status": "SUCCESS", + "result": result, + "error": None, + } + ) + except ResultMissing: + # No result yet; fall through to progress/pending logic + pass + except ResultFailure as exc: + return Response( + { + "status": "FAILURE", + "result": None, + "error": str(exc), + } + ) + except ResultTimeout as exc: + # Treat timeouts as pending + logger.debug("Result timeout for task %s: %s", task_id, exc) - # Prepare the response data - result_data = { - "status": task_result.status, - "result": None, - "error": None, - } + # Check if we have progress data for this task + progress_data = get_task_progress(task_id) - # If the result is a dict with status/result/error, unpack and propagate status - if isinstance(task_result.result, dict) and set(task_result.result.keys()) >= { - "status", - "result", - "error", - }: - result_data["status"] = task_result.result["status"] - result_data["result"] = task_result.result["result"] - result_data["error"] = task_result.result["error"] - else: - result_data["result"] = task_result.result - if task_result.state == "PROGRESS" and task_result.info: - result_data.update(task_result.info) + if progress_data: + # Task is in progress + return Response( + { + "status": "PROGRESS", + "result": None, + "error": None, + "progress": progress_data.get("progress"), + "message": progress_data.get("metadata", {}).get("message"), + "timestamp": progress_data.get("timestamp"), + } + ) - return Response(result_data) + # Default to pending when no result and no progress + return Response( + { + "status": "PENDING", + "result": None, + "error": None, + } + ) diff --git a/src/backend/core/management/commands/run_task.py b/src/backend/core/management/commands/run_task.py index dc9f2f458..5be7d634c 100644 --- a/src/backend/core/management/commands/run_task.py +++ b/src/backend/core/management/commands/run_task.py @@ -1,8 +1,8 @@ """ -Management command to run arbitrary Celery tasks synchronously. +Management command to run arbitrary Dramatiq actors synchronously. -This command provides a Django interface to run Celery tasks with the same -CLI flags as the main Celery CLI, but executes them synchronously instead +This command provides a Django interface to run Dramatiq actors with the same +CLI flags as the main Dramatiq CLI, but executes them synchronously instead of queuing them as background tasks. """ @@ -16,21 +16,21 @@ class Command(BaseCommand): - """Run arbitrary Celery tasks synchronously.""" + """Run arbitrary Dramatiq actors synchronously.""" help = """ - Run arbitrary Celery tasks synchronously. + Run arbitrary Dramatiq actors synchronously. Examples: - python manage.py run_task fetch_service_metrics - python manage.py run_task fetch_metrics_for_service --pargs '["123e4567-e89b-12d3-a456-426614174000"]' - python manage.py run_task fetch_service_metrics --kwargs '{"debug": true}' - python manage.py run_task other_app.tasks.some_task + python manage.py run_task send_message_task + python manage.py run_task send_message_task --pargs '["123e4567-e89b-12d3-a456-426614174000"]' + python manage.py run_task send_message_task --kwargs '{"must_archive": true}' + python manage.py run_task core.mda.tasks.send_message_task """ def add_arguments(self, parser): """Add command line arguments.""" - parser.add_argument("task_name", help="Name of the Celery task to run") + parser.add_argument("task_name", help="Name of the Dramatiq actor to run") # Task execution options parser.add_argument( @@ -76,7 +76,7 @@ def handle(self, *args, **options): try: # Execute task synchronously - result = task_func.apply(args=task_args, kwargs=kwargs) + result = task_func(*task_args, **kwargs) # Output result if options["json"]: @@ -92,8 +92,18 @@ def handle(self, *args, **options): def _get_task_function(self, task_name: str): """Get the task function by name using dynamic imports.""" try: - # Try to import from core.tasks first - tasks_module = importlib.import_module("core.tasks") + # Try to import from core.mda.tasks first + tasks_module = importlib.import_module("core.mda.tasks") + if hasattr(tasks_module, task_name): + return getattr(tasks_module, task_name) + + # Try core.services.search.tasks + tasks_module = importlib.import_module("core.services.search.tasks") + if hasattr(tasks_module, task_name): + return getattr(tasks_module, task_name) + + # Try core.services.importer.tasks + tasks_module = importlib.import_module("core.services.importer.tasks") if hasattr(tasks_module, task_name): return getattr(tasks_module, task_name) @@ -105,7 +115,7 @@ def _get_task_function(self, task_name: str): return getattr(module, func_name) self.stdout.write( - self.style.WARNING(f"Task '{task_name}' not found in core.tasks module") + self.style.WARNING(f"Task '{task_name}' not found in any tasks module") ) return None diff --git a/src/backend/core/management/commands/search_reindex.py b/src/backend/core/management/commands/search_reindex.py deleted file mode 100644 index a6a9d51e6..000000000 --- a/src/backend/core/management/commands/search_reindex.py +++ /dev/null @@ -1,157 +0,0 @@ -"""Management command to reindex content in OpenSearch.""" - -import uuid - -from django.core.management.base import BaseCommand, CommandError - -from core import models -from core.services.search import create_index_if_not_exists -from core.services.search.tasks import ( - _reindex_all_base, - reindex_all, - reindex_mailbox_task, - reindex_thread_task, -) - - -class Command(BaseCommand): - """Reindex content in OpenSearch.""" - - help = "Reindex content in OpenSearch" - - def add_arguments(self, parser): - """Add command arguments.""" - # Define a mutually exclusive group for the reindex scope - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument( - "--all", - action="store_true", - help="Reindex all threads and messages", - ) - group.add_argument( - "--thread", - type=str, - help="Reindex a specific thread by ID", - ) - group.add_argument( - "--mailbox", - type=str, - help="Reindex all threads and messages in a specific mailbox by ID", - ) - - # Async option - parser.add_argument( - "--async", - action="store_true", - help="Run task asynchronously", - dest="async_mode", - ) - - # Whether to recreate the index - parser.add_argument( - "--recreate-index", - action="store_true", - help="Recreate the index before reindexing", - ) - - def handle(self, *args, **options): - """Execute the command.""" - # Ensure index exists - self.stdout.write("Ensuring OpenSearch index exists...") - create_index_if_not_exists() - - # Handle reindexing based on scope - if options["all"]: - self._reindex_all(options["async_mode"]) - elif options["thread"]: - self._reindex_thread(options["thread"], options["async_mode"]) - elif options["mailbox"]: - self._reindex_mailbox(options["mailbox"], options["async_mode"]) - - def _reindex_all(self, async_mode): - """Reindex all threads and messages.""" - self.stdout.write("Reindexing all threads and messages...") - - if async_mode: - task = reindex_all.delay() - self.stdout.write( - self.style.SUCCESS(f"Reindexing task scheduled (ID: {task.id})") - ) - else: - # For synchronous execution, use the base function directly - def update_progress(current, total, success_count, failure_count): - """Update progress in the console.""" - self.stdout.write( - f"Progress: {current}/{total} threads processed " - f"({success_count} succeeded, {failure_count} failed)" - ) - - result = _reindex_all_base(update_progress) - self.stdout.write( - self.style.SUCCESS( - f"Reindexing completed: {result.get('success_count', 0)} succeeded, " - f"{result.get('failure_count', 0)} failed" - ) - ) - if result.get("failure_count", 0) > 0: - return 1 - - def _reindex_thread(self, thread_id, async_mode): - """Reindex a specific thread and its messages.""" - try: - thread_uuid = uuid.UUID(thread_id) - # Verify thread exists - models.Thread.objects.get(id=thread_uuid) - except ValueError as e: - raise CommandError(f"Invalid thread ID: {thread_id}") from e - except models.Thread.DoesNotExist as e: - raise CommandError(f"Thread with ID {thread_id} does not exist") from e - - self.stdout.write(f"Reindexing thread {thread_id}...") - - if async_mode: - task = reindex_thread_task.delay(thread_id) - self.stdout.write( - self.style.SUCCESS(f"Reindexing task scheduled (ID: {task.id})") - ) - else: - result = reindex_thread_task(thread_id) - if result.get("success", False): - self.stdout.write( - self.style.SUCCESS(f"Thread {thread_id} indexed successfully") - ) - else: - self.stdout.write( - self.style.ERROR( - f"Failed to index thread {thread_id}: {result.get('error', '')}" - ) - ) - return 1 - - def _reindex_mailbox(self, mailbox_id, async_mode): - """Reindex all threads and messages in a specific mailbox.""" - try: - mailbox_uuid = uuid.UUID(mailbox_id) - mailbox = models.Mailbox.objects.get(id=mailbox_uuid) - except ValueError as e: - raise CommandError(f"Invalid mailbox ID: {mailbox_id}") from e - except models.Mailbox.DoesNotExist as e: - raise CommandError(f"Mailbox with ID {mailbox_id} does not exist") from e - - self.stdout.write(f"Reindexing threads for mailbox {mailbox}...") - - if async_mode: - task = reindex_mailbox_task.delay(mailbox_id) - self.stdout.write( - self.style.SUCCESS(f"Reindexing task scheduled (ID: {task.id})") - ) - else: - result = reindex_mailbox_task(mailbox_id) - self.stdout.write( - self.style.SUCCESS( - f"Reindexing completed: {result.get('success_count', 0)} succeeded, " - f"{result.get('failure_count', 0)} failed" - ) - ) - if result.get("failure_count", 0) > 0: - return 1 diff --git a/src/backend/core/management/commands/worker.py b/src/backend/core/management/commands/worker.py new file mode 100644 index 000000000..db6ac8d5c --- /dev/null +++ b/src/backend/core/management/commands/worker.py @@ -0,0 +1,10 @@ +from django_dramatiq.management.commands.rundramatiq import ( + Command as RunDramatiqCommand, +) + + +class Command(RunDramatiqCommand): + def discover_tasks_modules(self): + tasks_modules = super().discover_tasks_modules() + tasks_modules[0] = "core.worker_setup" + return tasks_modules diff --git a/src/backend/core/mda/outbound.py b/src/backend/core/mda/outbound.py index 55d1a63dc..4d29f5b56 100644 --- a/src/backend/core/mda/outbound.py +++ b/src/backend/core/mda/outbound.py @@ -229,7 +229,7 @@ def prepare_outbound_message( def send_message(message: models.Message, force_mta_out: bool = False): """Send an existing Message, internally or externally. - This part is called asynchronously from the celery worker. + This part is called asynchronously from the worker. """ # Refuse to send messages that are draft or not senders diff --git a/src/backend/core/mda/tasks.py b/src/backend/core/mda/tasks.py index 4e948aec7..5d386f762 100644 --- a/src/backend/core/mda/tasks.py +++ b/src/backend/core/mda/tasks.py @@ -5,81 +5,74 @@ from django.db.models import Q from django.utils import timezone -from celery.utils.log import get_task_logger +import dramatiq +from dramatiq_crontab import cron from core import models from core.enums import MessageDeliveryStatusChoices from core.mda.outbound import send_message from core.mda.selfcheck import run_selfcheck +from core.utils import register_task, set_task_progress -from messages.celery_app import app as celery_app +logger = dramatiq.get_logger(__name__) -logger = get_task_logger(__name__) - -@celery_app.task(bind=True) -def send_message_task(self, message_id, force_mta_out=False, must_archive=False): +@register_task +def send_message_task(message_id, force_mta_out=False, must_archive=False): """Send a message asynchronously. Args: message_id: The ID of the message to send force_mta_out: Whether to force sending via MTA + must_archive: Whether to archive the thread after sending Returns: dict: A dictionary with success status and info """ - try: - message = ( - models.Message.objects.select_related("thread", "sender") - .prefetch_related("recipients__contact") - .get(id=message_id) - ) + set_task_progress(0, {"message": "Starting message send"}) - send_message(message, force_mta_out) + message = ( + models.Message.objects.select_related("thread", "sender") + .prefetch_related("recipients__contact") + .get(id=message_id) + ) - # Update task state with progress information - self.update_state( - state="SUCCESS", - meta={ - "status": "completed", # TODO fetch recipients statuses - "message_id": str(message_id), - "success": True, - }, - ) + set_task_progress(25, {"message": "Message loaded, sending..."}) - # If requested, archive the whole thread after sending - if must_archive: - try: - thread = message.thread - models.Message.objects.filter(thread=thread).update( - is_archived=True, archived_at=timezone.now() - ) - thread.update_stats() - except Exception as e: - # Not critical, just log the error - logger.exception( - "Error in send_message_task when archiving thread %s after sending message %s: %s", - thread.id, - message_id, - e, - ) - - return { - "message_id": str(message_id), - "success": True, - } - # pylint: disable=broad-exception-caught - except Exception as e: - logger.exception("Error in send_message_task for message %s: %s", message_id, e) - self.update_state( - state="FAILURE", - meta={"status": "failed", "message_id": str(message_id), "error": str(e)}, - ) - raise + send_message(message, force_mta_out) + set_task_progress(75, {"message": "Message sent, processing archive..."}) -@celery_app.task(bind=True) -def selfcheck_task(self): + # If requested, archive the whole thread after sending + if must_archive: + try: + thread = message.thread + models.Message.objects.filter(thread=thread).update( + is_archived=True, archived_at=timezone.now() + ) + thread.update_stats() + set_task_progress(90, {"message": "Thread archived"}) + except Exception as e: + # Not critical, just log the error + logger.exception( + "Error in send_message_task when archiving thread %s after sending message %s: %s", + thread.id, + message_id, + e, + ) + + result = { + "message_id": str(message_id), + "success": True, + } + + set_task_progress(100, {"message": "Message sent successfully"}) + return result + + +@cron("0 */6 * * *") # Every 6 hours +@register_task +def selfcheck_task(): """Run a selfcheck of the mail delivery system. This task performs an end-to-end test of the mail delivery pipeline: @@ -94,33 +87,13 @@ def selfcheck_task(self): Returns: dict: A dictionary with success status, timings, and metrics """ - try: - result = run_selfcheck() - - # Update task state with progress information - self.update_state( - state="SUCCESS", - meta={ - "status": "completed", - "success": result["success"], - "send_time": result["send_time"], - "reception_time": result["reception_time"], - }, - ) + result = run_selfcheck() + return result - return result - # pylint: disable=broad-exception-caught - except Exception as e: - logger.exception("Error in selfcheck_task: %s", e) - self.update_state( - state="FAILURE", - meta={"status": "failed", "error": str(e)}, - ) - raise - -@celery_app.task(bind=True) -def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=100): +@cron("*/5 * * * *") # Every 5 minutes +@register_task +def retry_messages_task(message_id=None, force_mta_out=False, batch_size=100): """Retry sending messages with retryable recipients (respects retry timing). Args: @@ -131,6 +104,9 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1 Returns: dict: A dictionary with task status and results """ + + set_task_progress(0, {"message": "Finding messages to retry"}) + # Get messages to process if message_id: # Single message mode @@ -169,7 +145,7 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1 total_messages = len(messages_to_process) if total_messages == 0: - return { + result = { "success": True, "total_messages": 0, "processed_messages": 0, @@ -177,6 +153,16 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1 "error_count": 0, "message": "No messages ready for retry", } + set_task_progress(100, {"message": "No messages to retry"}) + return result + + set_task_progress( + 10, + { + "message": f"Found {total_messages} messages to retry", + "total_messages": total_messages, + }, + ) # Process messages in batches processed_count = 0 @@ -186,19 +172,20 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1 for batch_start in range(0, total_messages, batch_size): batch_messages = messages_to_process[batch_start : batch_start + batch_size] - # Update progress for bulk operations - if not message_id: - self.update_state( - state="PROGRESS", - meta={ - "current_batch": batch_start // batch_size + 1, - "total_batches": (total_messages + batch_size - 1) // batch_size, - "processed_messages": processed_count, - "total_messages": total_messages, - "success_count": success_count, - "error_count": error_count, - }, - ) + # Update progress for batch processing + progress_percentage = min(10 + (batch_start / total_messages) * 80, 90) + set_task_progress( + int(progress_percentage), + { + "message": f"Processing batch {batch_start // batch_size + 1}", + "current_batch": batch_start // batch_size + 1, + "total_batches": (total_messages + batch_size - 1) // batch_size, + "processed_messages": processed_count, + "total_messages": total_messages, + "success_count": success_count, + "error_count": error_count, + }, + ) for message in batch_messages: try: @@ -227,7 +214,7 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1 # Return appropriate result format if message_id: - return { + result = { "success": True, "message_id": str(message_id), "recipients_processed": success_count, @@ -235,11 +222,19 @@ def retry_messages_task(self, message_id=None, force_mta_out=False, batch_size=1 "success_count": success_count, "error_count": error_count, } + else: + result = { + "success": True, + "total_messages": total_messages, + "processed_messages": processed_count, + "success_count": success_count, + "error_count": error_count, + } - return { - "success": True, - "total_messages": total_messages, - "processed_messages": processed_count, - "success_count": success_count, - "error_count": error_count, - } + set_task_progress( + 100, + { + "message": f"Retry completed: {success_count} succeeded, {error_count} failed" + }, + ) + return result diff --git a/src/backend/core/services/dns/tasks.py b/src/backend/core/services/dns/tasks.py deleted file mode 100644 index a933cc4da..000000000 --- a/src/backend/core/services/dns/tasks.py +++ /dev/null @@ -1,17 +0,0 @@ -"""DNS tasks.""" - -# pylint: disable=unused-argument, broad-exception-raised, broad-exception-caught, too-many-lines - -# @celery_app.task(bind=True) -# def check_maildomain_dns(self, maildomain_id): -# """Check if the DNS records for a mail domain are correct.""" - -# maildomain = models.MailDomain.objects.get(id=maildomain_id) -# expected_records = maildomain.get_expected_dns_records() -# for record in expected_records: -# res = dns.resolver.resolve( -# record["target"], record["type"], raise_on_no_answer=False, lifetime=10 -# ) -# print(res) -# print(record) -# return {"success": True} diff --git a/src/backend/core/services/importer/imap.py b/src/backend/core/services/importer/imap.py index 7b33e3aae..2be179ee7 100644 --- a/src/backend/core/services/importer/imap.py +++ b/src/backend/core/services/importer/imap.py @@ -3,6 +3,7 @@ import base64 import codecs import imaplib +import logging import re import socket import time @@ -10,12 +11,11 @@ from django.conf import settings -from celery.utils.log import get_task_logger - from core.mda.inbound import deliver_inbound_message from core.mda.rfc5322 import parse_email_message +from core.utils import set_task_progress -logger = get_task_logger(__name__) +logger = logging.getLogger(__name__) def decode_imap_utf7(s): @@ -396,7 +396,6 @@ def process_folder_messages( # pylint: disable=too-many-arguments message_list: List[bytes], recipient: Any, username: str, - task_instance: Any, success_count: int, failure_count: int, current_message: int, @@ -443,17 +442,17 @@ def process_folder_messages( # pylint: disable=too-many-arguments # Update task state after processing the message message_status = f"Processing message {current_message} of {total_messages}" - result = { - "message_status": message_status, - "total_messages": total_messages, - "success_count": success_count, - "failure_count": failure_count, - "type": "imap", - "current_message": current_message, - } - task_instance.update_state( - state="PROGRESS", - meta={"result": result, "error": None}, + + set_task_progress( + None, + { + "message_status": message_status, + "total_messages": total_messages, + "success_count": success_count, + "failure_count": failure_count, + "type": "imap", + "current_message": current_message, + }, ) return success_count, failure_count, current_message diff --git a/src/backend/core/services/importer/service.py b/src/backend/core/services/importer/service.py index 980a934c8..e89491125 100644 --- a/src/backend/core/services/importer/service.py +++ b/src/backend/core/services/importer/service.py @@ -69,25 +69,25 @@ def import_file( # Check MIME type for MBOX if unsafe_content_type in enums.MBOX_SUPPORTED_MIME_TYPES: # Process MBOX file asynchronously - task = process_mbox_file_task.delay(file_key, str(recipient.id)) - response_data = {"task_id": task.id, "type": "mbox"} + task = process_mbox_file_task.send(file_key, str(recipient.id)) + response_data = {"task_id": task.message_id, "type": "mbox"} if request: messages.info( request, f"Started processing MBOX file for recipient {recipient}. " - "This may take a while. You can check the status in the Celery task monitor.", + "This may take a while.", ) return True, response_data # Check MIME type for EML elif unsafe_content_type in enums.EML_SUPPORTED_MIME_TYPES: # Process EML file asynchronously - task = process_eml_file_task.delay(file_key, str(recipient.id)) - response_data = {"task_id": task.id, "type": "eml"} + task = process_eml_file_task.send(file_key, str(recipient.id)) + response_data = {"task_id": task.message_id, "type": "eml"} if request: messages.info( request, f"Started processing EML file for recipient {recipient}. " - "This may take a while. You can check the status in the Celery task monitor.", + "This may take a while.", ) return True, response_data except Exception as e: @@ -129,7 +129,7 @@ def import_imap( try: # Start the import task - task = import_imap_messages_task.delay( + task = import_imap_messages_task.send( imap_server=imap_server, imap_port=imap_port, username=username, @@ -137,12 +137,12 @@ def import_imap( use_ssl=use_ssl, recipient_id=str(recipient.id), ) - response_data = {"task_id": task.id, "type": "imap"} + response_data = {"task_id": task.message_id, "type": "imap"} if request: messages.info( request, f"Started importing messages from IMAP server for recipient {recipient}. " - "This may take a while. You can check the status in the Celery task monitor.", + "This may take a while.", ) return True, response_data diff --git a/src/backend/core/services/importer/tasks.py b/src/backend/core/services/importer/tasks.py index a000022ab..52c842553 100644 --- a/src/backend/core/services/importer/tasks.py +++ b/src/backend/core/services/importer/tasks.py @@ -5,15 +5,14 @@ from django.core.files.storage import storages +import dramatiq import magic -from celery.utils.log import get_task_logger from core import enums from core.mda.inbound import deliver_inbound_message from core.mda.rfc5322 import parse_email_message from core.models import Mailbox - -from messages.celery_app import app as celery_app +from core.utils import register_task, set_task_progress from .imap import ( IMAPConnectionManager, @@ -24,11 +23,11 @@ select_imap_folder, ) -logger = get_task_logger(__name__) +logger = dramatiq.get_logger(__name__) -@celery_app.task(bind=True) -def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str, Any]: +@register_task +def process_mbox_file_task(file_key: str, recipient_id: str) -> Dict[str, Any]: """ Process a MBOX file asynchronously. @@ -44,6 +43,8 @@ def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str, total_messages = 0 current_message = 0 + set_task_progress(0, {"message": "Loading recipient mailbox"}) + try: recipient = Mailbox.objects.get(id=recipient_id) except Mailbox.DoesNotExist: @@ -56,13 +57,6 @@ def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str, "type": "mbox", "current_message": 0, } - self.update_state( - state="FAILURE", - meta={ - "result": result, - "error": error_msg, - }, - ) return { "status": "FAILURE", "result": result, @@ -70,53 +64,56 @@ def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str, } try: + set_task_progress(5, {"message": "Opening MBOX file"}) + # Get storage and open file message_imports_storage = storages["message-imports"] with message_imports_storage.open(file_key, "rb") as file: - self.update_state( - state="PROGRESS", - meta={ - "result": { - "message_status": "Initializing import", - "type": "mbox", - }, - "error": None, - }, - ) + set_task_progress(10, {"message": "Validating file format"}) + # Ensure file is a valid mbox file content_type = magic.from_buffer(file.read(2048), mime=True) if content_type not in enums.MBOX_SUPPORTED_MIME_TYPES: raise Exception(f"Expected MBOX file, got {content_type}") + set_task_progress(15, {"message": "Counting messages"}) + # First pass: count total messages file.seek(0) total_messages = count_mbox_messages(file) + set_task_progress( + 20, + { + "message": f"Found {total_messages} messages, starting processing", + "total_messages": total_messages, + }, + ) + # Reset file pointer file.seek(0) # Second pass: process messages for i, message_content in enumerate(stream_mbox_messages(file), 1): current_message = i - try: - # Update task state with progress - result = { - "message_status": f"Processing message {i} of {total_messages}", - "total_messages": total_messages, - "success_count": success_count, - "failure_count": failure_count, - "type": "mbox", - "current_message": i, - } - self.update_state( - state="PROGRESS", - meta={ - "result": result, - "error": None, + + # Update progress every 100 messages or at the end + if i % 100 == 0 or i == total_messages: + progress_percentage = min(20 + (i / total_messages) * 70, 90) + set_task_progress( + int(progress_percentage), + { + "message": f"Processing message {i} of {total_messages}", + "total_messages": total_messages, + "success_count": success_count, + "failure_count": failure_count, + "type": "mbox", + "current_message": i, }, ) + try: # Parse the email message parsed_email = parse_email_message(message_content) # Deliver the message @@ -143,13 +140,7 @@ def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str, "current_message": current_message, } - self.update_state( - state="SUCCESS", - meta={ - "result": result, - "error": None, - }, - ) + set_task_progress(100, {"message": "MBOX processing completed successfully"}) return { "status": "SUCCESS", @@ -172,13 +163,6 @@ def process_mbox_file_task(self, file_key: str, recipient_id: str) -> Dict[str, "type": "mbox", "current_message": current_message, } - self.update_state( - state="FAILURE", - meta={ - "result": result, - "error": error_msg, - }, - ) return { "status": "FAILURE", "result": result, @@ -241,9 +225,8 @@ def stream_mbox_messages(file) -> Generator[bytes, None, None]: yield message -@celery_app.task(bind=True) +@register_task def import_imap_messages_task( - self, imap_server: str, imap_port: int, username: str, @@ -270,6 +253,8 @@ def import_imap_messages_task( current_message = 0 try: + set_task_progress(0, {"message": "Connecting to IMAP server"}) + # Get recipient mailbox recipient = Mailbox.objects.get(id=recipient_id) @@ -324,7 +309,6 @@ def import_imap_messages_task( message_list=message_list, recipient=recipient, username=username, - task_instance=self, success_count=success_count, failure_count=failure_count, current_message=current_message, @@ -350,11 +334,6 @@ def import_imap_messages_task( "current_message": current_message, } - self.update_state( - state="SUCCESS", - meta={"status": "SUCCESS", "result": result, "error": None}, - ) - return {"status": "SUCCESS", "result": result, "error": None} except Mailbox.DoesNotExist: @@ -367,7 +346,6 @@ def import_imap_messages_task( "type": "imap", "current_message": 0, } - self.update_state(state="FAILURE", meta={"result": result, "error": error_msg}) return {"status": "FAILURE", "result": result, "error": error_msg} except Exception as e: @@ -382,12 +360,11 @@ def import_imap_messages_task( "type": "imap", "current_message": current_message, } - self.update_state(state="FAILURE", meta={"result": result, "error": error_msg}) return {"status": "FAILURE", "result": result, "error": error_msg} -@celery_app.task(bind=True) -def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, Any]: +@register_task +def process_eml_file_task(file_key: str, recipient_id: str) -> Dict[str, Any]: """ Process an EML file asynchronously. @@ -398,6 +375,9 @@ def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, A Returns: Dict with task status and result """ + + set_task_progress(0, {"message": "Loading recipient mailbox"}) + try: recipient = Mailbox.objects.get(id=recipient_id) except Mailbox.DoesNotExist: @@ -410,36 +390,13 @@ def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, A "type": "eml", "current_message": 0, } - self.update_state( - state="FAILURE", - meta={ - "result": result, - "error": error_msg, - }, - ) return { + "status": "FAILURE", "result": result, "error": error_msg, } try: - # Update progress state - progress_result = { - "message_status": "Processing message 1 of 1", - "total_messages": 1, - "success_count": 0, - "failure_count": 0, - "type": "eml", - "current_message": 1, - } - self.update_state( - state="PROGRESS", - meta={ - "result": progress_result, - "error": None, - }, - ) - # Get storage and read file message_imports_storage = storages["message-imports"] with message_imports_storage.open(file_key, "rb") as file: @@ -467,13 +424,6 @@ def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, A } if success: - self.update_state( - state="SUCCESS", - meta={ - "result": result, - "error": None, - }, - ) return { "status": "SUCCESS", "result": result, @@ -481,13 +431,6 @@ def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, A } error_msg = "Failed to deliver message" - self.update_state( - state="FAILURE", - meta={ - "result": result, - "error": error_msg, - }, - ) return { "status": "FAILURE", "result": result, @@ -509,13 +452,6 @@ def process_eml_file_task(self, file_key: str, recipient_id: str) -> Dict[str, A "type": "eml", "current_message": 1, } - self.update_state( - state="FAILURE", - meta={ - "result": result, - "error": error_msg, - }, - ) return { "status": "FAILURE", "result": result, diff --git a/src/backend/core/services/search/__init__.py b/src/backend/core/services/search/__init__.py index 51ff6886b..0a972ba38 100644 --- a/src/backend/core/services/search/__init__.py +++ b/src/backend/core/services/search/__init__.py @@ -6,9 +6,6 @@ get_opensearch_client, index_message, index_thread, - reindex_all, - reindex_mailbox, - reindex_thread, ) from core.services.search.mapping import MESSAGE_INDEX, MESSAGE_MAPPING from core.services.search.parse import parse_search_query @@ -25,9 +22,6 @@ # Indexing "index_message", "index_thread", - "reindex_all", - "reindex_mailbox", - "reindex_thread", # Parsing "parse_search_query", # Searching diff --git a/src/backend/core/services/search/index.py b/src/backend/core/services/search/index.py index 0f5f6ca8d..ba47de707 100644 --- a/src/backend/core/services/search/index.py +++ b/src/backend/core/services/search/index.py @@ -187,78 +187,3 @@ def index_thread(thread: models.Thread) -> bool: except Exception as e: logger.error("Error indexing thread %s: %s", thread.id, e) return False - - -def reindex_all(): - """Reindex all messages and threads.""" - - # Delete and recreate the index - delete_index() - create_index_if_not_exists() - - # Count indexed items - indexed_threads = 0 - indexed_messages = 0 - - # Index all threads - for thread in models.Thread.objects.all(): - if index_thread(thread): - indexed_threads += 1 - indexed_messages += thread.messages.count() - - return { - "status": "success", - "indexed_threads": indexed_threads, - "indexed_messages": indexed_messages, - } - - -def reindex_mailbox(mailbox_id: str): - """Reindex all messages and threads for a specific mailbox.""" - - # Count indexed items - indexed_threads = 0 - indexed_messages = 0 - - try: - # Get the mailbox - mailbox = models.Mailbox.objects.get(id=mailbox_id) - - # Index all threads the mailbox has access to - for thread in mailbox.threads_viewer: - if index_thread(thread): - indexed_threads += 1 - indexed_messages += thread.messages.count() - - return { - "status": "success", - "mailbox": mailbox_id, - "indexed_threads": indexed_threads, - "indexed_messages": indexed_messages, - } - except models.Mailbox.DoesNotExist: - return {"status": "error", "mailbox": mailbox_id, "error": "Mailbox not found"} - - # pylint: disable=broad-exception-caught - except Exception as e: - logger.error("Error reindexing mailbox %s: %s", mailbox_id, e) - return {"status": "error", "mailbox": mailbox_id, "error": str(e)} - - -def reindex_thread(thread_id: str): - """Reindex a specific thread.""" - - try: - thread = models.Thread.objects.get(id=thread_id) - success = index_thread(thread) - - return { - "status": "success" if success else "error", - "thread": thread_id, - "indexed_messages": thread.messages.count() if success else 0, - } - except models.Thread.DoesNotExist: - return {"status": "error", "thread": thread_id, "error": "Thread not found"} - # pylint: disable=broad-exception-caught - except Exception as e: - return {"status": "error", "thread": thread_id, "error": str(e)} diff --git a/src/backend/core/services/search/tasks.py b/src/backend/core/services/search/tasks.py index 9600b77d3..4968c953e 100644 --- a/src/backend/core/services/search/tasks.py +++ b/src/backend/core/services/search/tasks.py @@ -4,7 +4,7 @@ from django.conf import settings -from celery.utils.log import get_task_logger +import dramatiq from core import models from core.services.search import ( @@ -13,18 +13,14 @@ index_message, index_thread, ) +from core.utils import register_task -from messages.celery_app import app as celery_app +logger = dramatiq.get_logger(__name__) -logger = get_task_logger(__name__) - -def _reindex_all_base(update_progress=None): - """Base function for reindexing all threads and messages. - - Args: - update_progress: Optional callback function to update progress - """ +@register_task +def reindex_all_task(): + """Reindex all threads and messages.""" if not settings.OPENSEARCH_INDEX_THREADS: logger.info("OpenSearch thread indexing is disabled.") return {"success": False, "reason": "disabled"} @@ -50,8 +46,8 @@ def _reindex_all_base(update_progress=None): logger.exception("Error indexing thread %s: %s", thread.id, e) # Update progress if callback provided - if update_progress and i % 100 == 0: - update_progress(i, total, success_count, failure_count) + if i % 100 == 0: + logger.debug("Progress for all threads: %s of %s", i, total) return { "success": True, @@ -61,27 +57,8 @@ def _reindex_all_base(update_progress=None): } -@celery_app.task(bind=True) -def reindex_all(self): - """Celery task wrapper for reindexing all threads and messages.""" - - def update_progress(current, total, success_count, failure_count): - """Update task progress.""" - self.update_state( - state="PROGRESS", - meta={ - "current": current, - "total": total, - "success_count": success_count, - "failure_count": failure_count, - }, - ) - - return _reindex_all_base(update_progress) - - -@celery_app.task(bind=True) -def reindex_thread_task(self, thread_id): +@register_task +def reindex_thread_task(thread_id): """Reindex a specific thread and all its messages.""" if not settings.OPENSEARCH_INDEX_THREADS: logger.info("OpenSearch thread indexing is disabled.") @@ -113,8 +90,8 @@ def reindex_thread_task(self, thread_id): raise -@celery_app.task(bind=True) -def reindex_mailbox_task(self, mailbox_id): +@register_task +def reindex_mailbox_task(mailbox_id): """Reindex all threads and messages in a specific mailbox.""" if not settings.OPENSEARCH_INDEX_THREADS: logger.info("OpenSearch thread indexing is disabled.") @@ -142,14 +119,8 @@ def reindex_mailbox_task(self, mailbox_id): # Update progress every 50 threads if i % 50 == 0: - self.update_state( - state="PROGRESS", - meta={ - "current": i, - "total": total, - "success_count": success_count, - "failure_count": failure_count, - }, + logger.debug( + "Updating progress for mailbox %s: %s of %s", mailbox_id, i, total ) return { @@ -161,8 +132,8 @@ def reindex_mailbox_task(self, mailbox_id): } -@celery_app.task(bind=True) -def index_message_task(self, message_id): +@register_task +def index_message_task(message_id): """Index a single message.""" if not settings.OPENSEARCH_INDEX_THREADS: logger.info("OpenSearch message indexing is disabled.") @@ -201,10 +172,9 @@ def index_message_task(self, message_id): raise -@celery_app.task(bind=True) -def reset_search_index(self): +@register_task +def reset_search_index(): """Delete and recreate the OpenSearch index.""" - delete_index() create_index_if_not_exists() return {"success": True} diff --git a/src/backend/core/signals.py b/src/backend/core/signals.py index e95b5ed0f..b4d668533 100644 --- a/src/backend/core/signals.py +++ b/src/backend/core/signals.py @@ -53,8 +53,8 @@ def index_message_post_save(sender, instance, created, **kwargs): try: # Schedule the indexing task asynchronously - index_message_task.delay(str(instance.id)) - # reindex_thread_task.delay(str(instance.thread.id)) + index_message_task.send(str(instance.id)) + # reindex_thread_task.send(str(instance.thread.id)) # pylint: disable=broad-exception-caught except Exception as e: @@ -74,7 +74,7 @@ def index_message_recipient_post_save(sender, instance, created, **kwargs): try: # Schedule the indexing task asynchronously # TODO: deduplicate the indexing of the message! - index_message_task.delay(str(instance.message.id)) + index_message_task.send(str(instance.message.id)) # pylint: disable=broad-exception-caught except Exception as e: @@ -93,7 +93,7 @@ def index_thread_post_save(sender, instance, created, **kwargs): try: # Schedule the indexing task asynchronously - reindex_thread_task.delay(str(instance.id)) + reindex_thread_task.send(str(instance.id)) # pylint: disable=broad-exception-caught except Exception as e: diff --git a/src/backend/core/tasks.py b/src/backend/core/tasks.py index 02986e842..050499760 100644 --- a/src/backend/core/tasks.py +++ b/src/backend/core/tasks.py @@ -1,7 +1,6 @@ # pylint: disable=wildcard-import, unused-wildcard-import -"""Register all tasks here so that Celery autodiscovery can find them.""" +"""Register all tasks here so that Dramatiq autodiscovery can find them.""" -from core.mda.tasks import * # noqa: F403 -from core.services.dns.tasks import * # noqa: F403 -from core.services.importer.tasks import * # noqa: F403 -from core.services.search.tasks import * # noqa: F403 +# from core.mda.tasks import * +# from core.services.importer.tasks import * +# from core.services.search.tasks import * diff --git a/src/backend/core/templates/admin/index.html b/src/backend/core/templates/admin/index.html new file mode 100644 index 000000000..e791196b9 --- /dev/null +++ b/src/backend/core/templates/admin/index.html @@ -0,0 +1,19 @@ +{% extends "admin/index.html" %} + +{% block content %} +{{ block.super }} + +{% if user.is_staff %} +
+

Task Management

+
+
+ + Dramatiq Dashboard + +

Monitor and manage background tasks

+
+
+
+{% endif %} +{% endblock %} \ No newline at end of file diff --git a/src/backend/core/tests/api/test_messages_import.py b/src/backend/core/tests/api/test_messages_import.py index 49899e1e1..babc7fd93 100644 --- a/src/backend/core/tests/api/test_messages_import.py +++ b/src/backend/core/tests/api/test_messages_import.py @@ -137,7 +137,7 @@ def test_api_import_eml_file(api_client, user, mailbox, eml_file): ) -def test_api_import_mbox_file(api_client, user, mailbox, mbox_file): +def test_api_import_mbox_file(api_client, user, mailbox, mbox_file, worker): """Test import of MBOX file.""" # add access to mailbox mailbox.accesses.create(user=user, role=MailboxRoleChoices.ADMIN) @@ -149,6 +149,9 @@ def test_api_import_mbox_file(api_client, user, mailbox, mbox_file): ) assert response.status_code == 202 assert response.data["type"] == "mbox" + + worker.join() + # Verify messages were created assert Message.objects.count() == 3 messages = Message.objects.order_by("created_at") @@ -184,9 +187,7 @@ def test_api_import_mbox_async(api_client, user, mailbox, mbox_file): """Test import of MBOX file asynchronously.""" # add access to mailbox mailbox.accesses.create(user=user, role=MailboxRoleChoices.ADMIN) - with patch( - "core.services.importer.tasks.process_mbox_file_task.delay" - ) as mock_task: + with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id" mock_task.return_value.status = "PENDING" response = api_client.post( @@ -217,7 +218,7 @@ def test_api_import_imap_task(api_client, user, mailbox): """Test import of IMAP messages.""" mailbox.accesses.create(user=user, role=MailboxRoleChoices.ADMIN) with patch( - "core.services.importer.tasks.import_imap_messages_task.delay" + "core.services.importer.tasks.import_imap_messages_task.send" ) as mock_task: mock_task.return_value.id = "fake-task-id" data = { @@ -335,7 +336,7 @@ def test_api_import_duplicate_eml_file(api_client, user, mailbox, eml_file): assert Thread.objects.count() == 0 # First import - with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task: + with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id-1" response = api_client.post( IMPORT_FILE_URL, @@ -360,7 +361,7 @@ def test_api_import_duplicate_eml_file(api_client, user, mailbox, eml_file): assert Thread.objects.count() == 1 # Import again the same file - with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task: + with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id-2" response = api_client.post( IMPORT_FILE_URL, @@ -394,9 +395,7 @@ def test_api_import_duplicate_mbox_file(api_client, user, mailbox, mbox_file): assert Thread.objects.count() == 0 # First import - with patch( - "core.services.importer.tasks.process_mbox_file_task.delay" - ) as mock_task: + with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id-1" response = api_client.post( IMPORT_FILE_URL, @@ -424,9 +423,7 @@ def test_api_import_duplicate_mbox_file(api_client, user, mailbox, mbox_file): assert Thread.objects.count() == 2 # Second import of the same file - with patch( - "core.services.importer.tasks.process_mbox_file_task.delay" - ) as mock_task: + with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id-2" response = api_client.post( IMPORT_FILE_URL, @@ -466,7 +463,7 @@ def test_api_import_eml_same_message_different_mailboxes(api_client, user, eml_f assert Message.objects.count() == 0 # Import to first mailbox - with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task: + with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id-1" response = api_client.post( IMPORT_FILE_URL, @@ -490,7 +487,7 @@ def test_api_import_eml_same_message_different_mailboxes(api_client, user, eml_f assert Message.objects.count() == 1 # Import to second mailbox - with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task: + with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id-2" response = api_client.post( IMPORT_FILE_URL, @@ -537,9 +534,7 @@ def test_api_import_mbox_same_message_different_mailboxes(api_client, user, mbox assert Message.objects.count() == 0 # Import to first mailbox - with patch( - "core.services.importer.tasks.process_mbox_file_task.delay" - ) as mock_task: + with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id-1" response = api_client.post( IMPORT_FILE_URL, @@ -563,9 +558,7 @@ def test_api_import_mbox_same_message_different_mailboxes(api_client, user, mbox assert Message.objects.count() == 3 # Import to second mailbox - with patch( - "core.services.importer.tasks.process_mbox_file_task.delay" - ) as mock_task: + with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id-2" response = api_client.post( IMPORT_FILE_URL, @@ -767,7 +760,7 @@ def test_api_import_duplicate_imap_messages_different_mailboxes( # assert Thread.objects.count() == 0 # # First import -# with patch("core.mda.tasks.process_mbox_file_task.delay") as mock_task: +# with patch("core.mda.tasks.process_mbox_file_task.send") as mock_task: # mock_task.return_value.id = "fake-task-id-1" # with open(mbox_file_path, "rb") as f: # response = api_client.post( @@ -814,7 +807,7 @@ def test_api_import_duplicate_imap_messages_different_mailboxes( # assert messages[1].thread.messages.count() == 2 # # Second import of the same file -# with patch("core.mda.tasks.process_mbox_file_task.delay") as mock_task: +# with patch("core.mda.tasks.process_mbox_file_task.send") as mock_task: # mock_task.return_value.id = "fake-task-id-2" # with open(mbox_file_path, "rb") as f: # response = api_client.post( @@ -873,7 +866,7 @@ def test_api_import_duplicate_imap_messages_different_mailboxes( # This is another reply to the same thread.""" -# with patch("core.mda.tasks.process_mbox_file_task.delay") as mock_task: +# with patch("core.mda.tasks.process_mbox_file_task.send") as mock_task: # mock_task.return_value.id = "fake-task-id-3" # # Create a new MBOX file with just the new message # new_mbox_content = b"From \n" + new_message_content + b"\n\n" diff --git a/src/backend/core/tests/api/test_send_message_signature.py b/src/backend/core/tests/api/test_send_message_signature.py index 0223ecf79..e3fe8bb3c 100644 --- a/src/backend/core/tests/api/test_send_message_signature.py +++ b/src/backend/core/tests/api/test_send_message_signature.py @@ -118,7 +118,7 @@ def test_api_send_message_with_signature_placeholder( with patch("core.api.viewsets.send.send_message_task") as mock_task: mock_task_instance = MagicMock() mock_task_instance.id = "task-123" - mock_task.delay.return_value = mock_task_instance + mock_task.send.return_value = mock_task_instance # Send request response = client.post( @@ -158,7 +158,7 @@ def test_api_send_message_without_signature_placeholder( with patch("core.api.viewsets.send.send_message_task") as mock_task: mock_task_instance = MagicMock() mock_task_instance.id = "task-123" - mock_task.delay.return_value = mock_task_instance + mock_task.send.return_value = mock_task_instance # Send request response = client.post( @@ -196,7 +196,7 @@ def test_api_send_message_with_text_body_only( with patch("core.api.viewsets.send.send_message_task") as mock_task: mock_task_instance = MagicMock() mock_task_instance.id = "task-123" - mock_task.delay.return_value = mock_task_instance + mock_task.send.return_value = mock_task_instance # Send request with text body only response = client.post( @@ -241,7 +241,7 @@ def test_api_send_message_with_html_body_only( with patch("core.api.viewsets.send.send_message_task") as mock_task: mock_task_instance = MagicMock() mock_task_instance.id = "task-123" - mock_task.delay.return_value = mock_task_instance + mock_task.send.return_value = mock_task_instance # Send request with HTML body only response = client.post( @@ -292,7 +292,7 @@ def test_api_send_message_with_signature_and_reply( with patch("core.api.viewsets.send.send_message_task") as mock_task: mock_task_instance = MagicMock() mock_task_instance.id = "task-123" - mock_task.delay.return_value = mock_task_instance + mock_task.send.return_value = mock_task_instance # Send request response = client.post( @@ -333,7 +333,7 @@ def test_api_send_message_with_archive_true( with patch("core.api.viewsets.send.send_message_task") as mock_task: mock_task_instance = MagicMock() mock_task_instance.id = "task-123" - mock_task.delay.return_value = mock_task_instance + mock_task.send.return_value = mock_task_instance # Send request with HTML body only response = client.post( @@ -350,7 +350,7 @@ def test_api_send_message_with_archive_true( assert response.status_code == status.HTTP_200_OK assert response.data["task_id"] == "task-123" - mock_task.delay.assert_called_once_with( + mock_task.send.assert_called_once_with( str(draft_message.id), must_archive=True ) @@ -366,7 +366,7 @@ def test_api_send_message_with_archive_false( with patch("core.api.viewsets.send.send_message_task") as mock_task: mock_task_instance = MagicMock() mock_task_instance.id = "task-123" - mock_task.delay.return_value = mock_task_instance + mock_task.send.return_value = mock_task_instance # Send request with archive=False response = client.post( @@ -385,7 +385,7 @@ def test_api_send_message_with_archive_false( assert response.data["task_id"] == "task-123" # Verify the task was called with must_archive=False - mock_task.delay.assert_called_once_with( + mock_task.send.assert_called_once_with( str(draft_message.id), must_archive=False ) @@ -401,7 +401,7 @@ def test_api_send_message_without_archive_parameter( with patch("core.api.viewsets.send.send_message_task") as mock_task: mock_task_instance = MagicMock() mock_task_instance.id = "task-123" - mock_task.delay.return_value = mock_task_instance + mock_task.send.return_value = mock_task_instance # Send request without archive parameter response = client.post( @@ -419,6 +419,6 @@ def test_api_send_message_without_archive_parameter( assert response.data["task_id"] == "task-123" # Verify the task was called with must_archive=False (default) - mock_task.delay.assert_called_once_with( + mock_task.send.assert_called_once_with( str(draft_message.id), must_archive=False ) diff --git a/src/backend/core/tests/api/test_task.py b/src/backend/core/tests/api/test_task.py new file mode 100644 index 000000000..43481c9cf --- /dev/null +++ b/src/backend/core/tests/api/test_task.py @@ -0,0 +1,145 @@ +"""Tests for task API endpoints.""" + +import time +import uuid + +from django.core.cache import cache +from django.test import TestCase + +import pytest +from rest_framework import status +from rest_framework.test import APIClient + +from core import factories +from core.mda.tasks import send_message_task +from core.utils import get_task_progress + + +class TaskDetailViewTest(TestCase): + """Test the TaskDetailView API endpoint.""" + + def setUp(self): + """Set up test data.""" + self.client = APIClient() + self.task_id = uuid.uuid4() + self.url = f"/api/v1.0/tasks/{self.task_id}/" + + self.user = factories.UserFactory() + self.client.force_authenticate(user=self.user) + + def _set_progress_data(self, task_id, progress, metadata=None): + """Helper method to set progress data directly in cache for testing.""" + progress_data = { + "progress": progress, + "timestamp": time.time(), + "metadata": metadata or {}, + } + cache.set(f"task_progress:{task_id}", progress_data, timeout=86400) + + def test_task_status_pending(self): + """Test task status when no progress data exists.""" + # Don't set any progress data - should return PENDING + response = self.client.get(self.url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["status"], "PENDING") + self.assertIsNone(response.data["result"]) + self.assertIsNone(response.data["error"]) + + def test_task_status_progress(self): + """Test task status when progress data exists.""" + # Set real progress data using helper method + self._set_progress_data(self.task_id, 75, {"message": "Processing batch 3"}) + + response = self.client.get(self.url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["status"], "PROGRESS") + self.assertEqual(response.data["progress"], 75) + self.assertEqual(response.data["message"], "Processing batch 3") + self.assertIsNotNone(response.data["timestamp"]) + self.assertIsNone(response.data["result"]) + self.assertIsNone(response.data["error"]) + + def test_task_status_progress_no_message(self): + """Test task status when progress data exists but no message.""" + # Set progress data without message + self._set_progress_data(self.task_id, 50, {}) + + response = self.client.get(self.url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["status"], "PROGRESS") + self.assertEqual(response.data["progress"], 50) + self.assertIsNone(response.data["message"]) + self.assertIsNotNone(response.data["timestamp"]) + + def test_task_status_requires_authentication(self): + """Test that the endpoint requires authentication.""" + # Don't authenticate the client + response = APIClient().get(self.url) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_task_status_with_different_task_ids(self): + """Test that different task IDs work correctly.""" + task_id_2 = uuid.uuid4() + url_2 = f"/api/v1.0/tasks/{task_id_2}/" + + # Set progress for the second task + self._set_progress_data(task_id_2, 25, {"message": "Starting task"}) + + response = self.client.get(url_2) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["status"], "PROGRESS") + self.assertEqual(response.data["progress"], 25) + self.assertEqual(response.data["message"], "Starting task") + + # First task should still be pending + response = self.client.get(self.url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["status"], "PENDING") + + def test_task_progress_retrieval(self): + """Test that we can retrieve progress data directly.""" + # Set progress data using helper method + self._set_progress_data(self.task_id, 90, {"message": "Almost done"}) + + # Retrieve it directly + progress_data = get_task_progress(self.task_id) + + self.assertIsNotNone(progress_data) + self.assertEqual(progress_data["progress"], 90) + self.assertEqual(progress_data["metadata"]["message"], "Almost done") + self.assertIsNotNone(progress_data["timestamp"]) + + def test_task_progress_nonexistent(self): + """Test that nonexistent task returns None.""" + nonexistent_task_id = "nonexistent-task-999" + + progress_data = get_task_progress(nonexistent_task_id) + + self.assertIsNone(progress_data) + + +@pytest.mark.django_db +def test_task_api_integration(worker): + """Integration test with actual Dramatiq task.""" + # Create a test message (you might need to adjust this based on your models) + # This is a basic integration test to verify the task API works with real tasks + + # Send a task + result = send_message_task.send("test-message-id") + task_id = result.message_id + + # Process the task synchronously + worker.join() + + # Test the API endpoint + client = APIClient() + response = client.get(f"/api/v1/tasks/{task_id}/") + + # Should return a valid response + assert response.status_code == 401 # Unauthorized without auth + # In a real test, you'd authenticate the client first diff --git a/src/backend/core/tests/conftest.py b/src/backend/core/tests/conftest.py index 6cb6f9f6a..3426593fa 100644 --- a/src/backend/core/tests/conftest.py +++ b/src/backend/core/tests/conftest.py @@ -2,6 +2,7 @@ from unittest import mock +import dramatiq import pytest USER = "user" @@ -9,6 +10,23 @@ VIA = [USER, TEAM] +@pytest.fixture(name="worker_broker") +def fixture_worker_broker(): + """Fixture that provides a clean StubBroker for testing.""" + broker = dramatiq.get_broker() + broker.flush_all() + return broker + + +@pytest.fixture(name="worker") +def fixture_worker(worker_broker): + """Fixture that provides a Dramatiq worker for testing.""" + worker = dramatiq.Worker(worker_broker, worker_timeout=100) + worker.start() + yield worker + worker.stop() + + @pytest.fixture def mock_user_teams(): """Mock for the "teams" property on the User model.""" @@ -16,16 +34,3 @@ def mock_user_teams(): "core.models.User.teams", new_callable=mock.PropertyMock ) as mock_teams: yield mock_teams - - -# @pytest.fixture -# @pytest.mark.django_db -# def create_testdomain(): -# """Create the TESTDOMAIN.""" -# from core import models -# models.MailDomain.objects.get_or_create( -# name=settings.MESSAGES_TESTDOMAIN, -# defaults={ -# "oidc_autojoin": True -# } -# ) diff --git a/src/backend/core/tests/importer/test_file_import.py b/src/backend/core/tests/importer/test_file_import.py index 5d5e918ea..86940b930 100644 --- a/src/backend/core/tests/importer/test_file_import.py +++ b/src/backend/core/tests/importer/test_file_import.py @@ -107,7 +107,7 @@ def test_import_eml_file(admin_client, eml_file, mailbox): mock_task.update_state = MagicMock() with ( - patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_delay, + patch("core.services.importer.tasks.process_eml_file_task.send") as mock_delay, patch.object(process_eml_file_task, "update_state", mock_task.update_state), ): mock_delay.return_value.id = "fake-task-id" @@ -183,7 +183,7 @@ def test_import_eml_file(admin_client, eml_file, mailbox): @pytest.mark.django_db def test_process_mbox_file_task(mailbox, mbox_file): - """Test the Celery task that processes MBOX files.""" + """Test the task that processes MBOX files.""" # Create a mock task instance mock_task = MagicMock() mock_task.update_state = MagicMock() diff --git a/src/backend/core/tests/importer/test_imap_import.py b/src/backend/core/tests/importer/test_imap_import.py index 0558ba620..21bdf8feb 100644 --- a/src/backend/core/tests/importer/test_imap_import.py +++ b/src/backend/core/tests/importer/test_imap_import.py @@ -14,8 +14,6 @@ from core.models import Mailbox, MailDomain, Message, Thread from core.services.importer.tasks import import_imap_messages_task -from messages.celery_app import app as celery_app - @pytest.fixture def admin_user(db): @@ -163,7 +161,7 @@ def test_imap_import_form_view(admin_client, mailbox): } with patch( - "core.services.importer.tasks.import_imap_messages_task.delay" + "core.services.importer.tasks.import_imap_messages_task.send" ) as mock_task: response = admin_client.post(url, form_data, follow=True) assert response.status_code == 200 @@ -174,13 +172,11 @@ def test_imap_import_form_view(admin_client, mailbox): @patch("imaplib.IMAP4_SSL") -@patch.object(celery_app.backend, "store_result") def test_imap_import_task_success( - mock_store_result, mock_imap4_ssl, mailbox, mock_imap_connection, sample_email + mock_imap4_ssl, mailbox, mock_imap_connection, sample_email ): """Test successful IMAP import task execution.""" mock_imap4_ssl.return_value = mock_imap_connection - mock_store_result.return_value = None # Create a mock task instance mock_task = MagicMock() @@ -310,12 +306,11 @@ def test_imap_import_task_login_failure(mailbox): @patch("imaplib.IMAP4_SSL") -@patch.object(celery_app.backend, "store_result") +@patch("core.services.importer.tasks.set_task_progress") def test_imap_import_task_message_fetch_failure( - mock_store_result, mock_imap4_ssl, mailbox + mock_set_task_progress, mock_imap4_ssl, mailbox ): """Test IMAP import task with message fetch failure.""" - mock_store_result.return_value = None mock_imap = MagicMock() mock_imap.login.return_value = ("OK", [b"Logged in"]) @@ -330,67 +325,56 @@ def test_imap_import_task_message_fetch_failure( mock_imap.logout.return_value = ("OK", [b"Logged out"]) mock_imap4_ssl.return_value = mock_imap - # Create a mock task instance - mock_task = MagicMock() - mock_task.update_state = MagicMock() - - with patch.object( - import_imap_messages_task, "update_state", mock_task.update_state - ): - # Run the task - task = import_imap_messages_task( - imap_server="imap.example.com", - imap_port=993, - username="test@example.com", - password="password123", - use_ssl=True, - recipient_id=str(mailbox.id), - ) - - # Verify all messages failed - assert task["status"] == "SUCCESS" - assert ( - task["result"]["message_status"] - == "Completed processing messages from folder 'INBOX'" - ) - assert task["result"]["type"] == "imap" - assert task["result"]["total_messages"] == 3 - assert task["result"]["success_count"] == 0 - assert task["result"]["failure_count"] == 3 - assert task["result"]["current_message"] == 3 + mock_set_task_progress = MagicMock() - # Verify progress updates were called correctly - assert mock_task.update_state.call_count == 4 # 3 PROGRESS + 1 SUCCESS + # Run the task + task = import_imap_messages_task( + imap_server="imap.example.com", + imap_port=993, + username="test@example.com", + password="password123", + use_ssl=True, + recipient_id=str(mailbox.id), + ) - # Verify progress updates - for i in range(1, 4): - mock_task.update_state.assert_any_call( - state="PROGRESS", - meta={ - "result": { - "message_status": f"Processing message {i} of 3", - "total_messages": 3, - "success_count": 0, - "failure_count": i, # Current message failed - "type": "imap", - "current_message": i, - }, - "error": None, + # Verify all messages failed + assert task["status"] == "SUCCESS" + assert ( + task["result"]["message_status"] + == "Completed processing messages from folder 'INBOX'" + ) + assert task["result"]["type"] == "imap" + assert task["result"]["total_messages"] == 3 + assert task["result"]["success_count"] == 0 + assert task["result"]["failure_count"] == 3 + assert task["result"]["current_message"] == 3 + + # Verify progress updates were called correctly + assert mock_set_task_progress.call_count == 3 # 3 PROGRESS + + # Verify progress updates + for i in range(1, 4): + mock_set_task_progress.assert_any_call( + state="PROGRESS", + meta={ + "result": { + "message_status": f"Processing message {i} of 3", + "total_messages": 3, + "success_count": 0, + "failure_count": i, # Current message failed + "type": "imap", + "current_message": i, }, - ) - - # Verify success update - mock_task.update_state.assert_any_call( - state="SUCCESS", - meta=task, + "error": None, + }, ) @patch("core.mda.inbound.logger") @patch("imaplib.IMAP4_SSL") -@patch.object(celery_app.backend, "store_result") +@patch("core.services.importer.tasks.set_task_progress") def test_imap_import_task_duplicate_recipients( - mock_store_result, + mock_set_task_progress, mock_imap4_ssl, mock_logger, mailbox, @@ -398,7 +382,6 @@ def test_imap_import_task_duplicate_recipients( ): """Test IMAP import task with duplicate recipients handles deduplication correctly.""" mock_imap4_ssl.return_value = mock_imap_connection_with_duplicates - mock_store_result.return_value = None # Create a mock task instance mock_task = MagicMock() diff --git a/src/backend/core/tests/importer/test_import_service.py b/src/backend/core/tests/importer/test_import_service.py index 9f36704da..29e1b7bf2 100644 --- a/src/backend/core/tests/importer/test_import_service.py +++ b/src/backend/core/tests/importer/test_import_service.py @@ -135,7 +135,7 @@ def mbox_key(user, mbox_file): @pytest.mark.django_db def test_import_file_eml_by_superuser(admin_user, mailbox, eml_key, mock_request): """Test successful EML file import for superuser.""" - with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task: + with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id" success, response_data = ImportService.import_file( file_key=eml_key, @@ -235,7 +235,7 @@ def test_import_file_eml_by_user_with_access_task(user, mailbox, eml_key, mock_r # Add access to mailbox mailbox.accesses.create(user=user, role=MailboxRoleChoices.ADMIN) - with patch("core.services.importer.tasks.process_eml_file_task.delay") as mock_task: + with patch("core.services.importer.tasks.process_eml_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id" success, response_data = ImportService.import_file( file_key=eml_key, @@ -338,9 +338,7 @@ def test_import_file_mbox_by_superuser_task( ): """Test successful MBOX file import by superuser.""" - with patch( - "core.services.importer.tasks.process_mbox_file_task.delay" - ) as mock_task: + with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id" success, response_data = ImportService.import_file( file_key=mbox_key, @@ -363,9 +361,7 @@ def test_import_file_mbox_by_user_with_access_task( # Add access to mailbox mailbox.accesses.create(user=user, role=MailboxRoleChoices.ADMIN) - with patch( - "core.services.importer.tasks.process_mbox_file_task.delay" - ) as mock_task: + with patch("core.services.importer.tasks.process_mbox_file_task.send") as mock_task: mock_task.return_value.id = "fake-task-id" success, response_data = ImportService.import_file( file_key=mbox_key, @@ -443,7 +439,7 @@ def test_import_file_invalid_file(admin_user, mailbox, mock_request): try: with patch( - "core.services.importer.tasks.process_eml_file_task.delay" + "core.services.importer.tasks.process_eml_file_task.send" ) as mock_task: # The task should not be called for invalid files mock_task.assert_not_called() @@ -470,7 +466,7 @@ def test_import_file_invalid_file(admin_user, mailbox, mock_request): def test_import_imap_by_superuser(admin_user, mailbox, mock_request): """Test successful IMAP import.""" with patch( - "core.services.importer.tasks.import_imap_messages_task.delay" + "core.services.importer.tasks.import_imap_messages_task.send" ) as mock_task: mock_task.return_value.id = "fake-task-id" success, response_data = ImportService.import_imap( @@ -504,7 +500,7 @@ def test_import_imap_by_user_with_access(user, mailbox, mock_request, role): mailbox.accesses.create(user=user, role=role) with patch( - "core.services.importer.tasks.import_imap_messages_task.delay" + "core.services.importer.tasks.import_imap_messages_task.send" ) as mock_task: mock_task.return_value.id = "fake-task-id" success, response_data = ImportService.import_imap( @@ -550,7 +546,7 @@ def test_import_imap_task_error(admin_user, mailbox, mock_request): mailbox.accesses.create(user=admin_user, role=MailboxRoleChoices.ADMIN) with patch( - "core.services.importer.tasks.import_imap_messages_task.delay" + "core.services.importer.tasks.import_imap_messages_task.send" ) as mock_task: mock_task.side_effect = Exception("Task error") success, response_data = ImportService.import_imap( diff --git a/src/backend/core/tests/search/test_search.py b/src/backend/core/tests/search/test_search.py index 25371df05..1d33ddff6 100644 --- a/src/backend/core/tests/search/test_search.py +++ b/src/backend/core/tests/search/test_search.py @@ -17,10 +17,12 @@ delete_index, index_message, index_thread, - reindex_all, - reindex_mailbox, search_threads, ) +from core.services.search.tasks import ( + reindex_all_task, + reindex_mailbox_task, +) @pytest.fixture(name="mock_es_client_search") @@ -137,7 +139,7 @@ def test_reindex_all(mock_es_client_index): mock_es_client_index.indices.exists.return_value = False # Call the function - result = reindex_all() + result = reindex_all_task() # Verify result assert result["status"] == "success" @@ -152,7 +154,7 @@ def test_reindex_mailbox(mock_es_client_index, test_mailbox, test_thread): """Test reindexing a specific mailbox.""" # Call the function - result = reindex_mailbox(str(test_mailbox.id)) + result = reindex_mailbox_task(str(test_mailbox.id)) assert mock_es_client_index.index.call_count > 0 diff --git a/src/backend/core/utils.py b/src/backend/core/utils.py index 299b191f7..5568aab30 100644 --- a/src/backend/core/utils.py +++ b/src/backend/core/utils.py @@ -1,8 +1,26 @@ """Root utils for the core application.""" import json +import time +from typing import Any, Dict, Optional + +from django.core.cache import cache from configurations import values +from dramatiq import actor +from dramatiq.middleware import CurrentMessage + + +def register_task(func): + """Register a function as a Dramatiq task with result storage enabled. + + Args: + func: The function to register as a task + + Returns: + The decorated function as a Dramatiq actor + """ + return actor(store_results=True)(func) class JSONValue(values.Value): @@ -16,3 +34,36 @@ def to_python(self, value): Return the python representation of the JSON string. """ return json.loads(value) + + +def set_task_progress(progress: int, metadata: Optional[Dict[str, Any]] = None) -> None: + """Set task progress in cache. + + Args: + progress: Progress percentage (0-100) + metadata: Optional metadata dictionary + """ + # Get the current message ID from Dramatiq CurrentMessage middleware + current_message = CurrentMessage.get_current_message() + if not current_message: + return # Do nothing if no current message + + task_id = current_message.message_id + progress_data = { + "progress": progress, + "timestamp": time.time(), + "metadata": metadata or {}, + } + cache.set(f"task_progress:{task_id}", progress_data, timeout=86400) # 24 hours + + +def get_task_progress(task_id: str) -> Optional[Dict[str, Any]]: + """Get task progress from cache. + + Args: + task_id: Task identifier + + Returns: + Progress data or None if not found + """ + return cache.get(f"task_progress:{task_id}") diff --git a/src/backend/core/worker_setup.py b/src/backend/core/worker_setup.py new file mode 100644 index 000000000..60e809c6a --- /dev/null +++ b/src/backend/core/worker_setup.py @@ -0,0 +1,11 @@ +"""Worker setup module for Dramatiq workers. + +This module configures Django and imports configurations for worker processes. +""" + +import django + +from configurations.importer import install + +install(check_options=True) +django.setup() diff --git a/src/backend/messages/__init__.py b/src/backend/messages/__init__.py index bf4eac6f6..db8a90f94 100644 --- a/src/backend/messages/__init__.py +++ b/src/backend/messages/__init__.py @@ -1,5 +1 @@ """Messages module.""" - -from .celery_app import app as celery_app - -__all__ = ("celery_app",) diff --git a/src/backend/messages/celery_app.py b/src/backend/messages/celery_app.py deleted file mode 100644 index 0b2cb7db8..000000000 --- a/src/backend/messages/celery_app.py +++ /dev/null @@ -1,43 +0,0 @@ -"""Messages celery configuration file.""" - -import os - -from celery import Celery -from configurations.importer import install - -# Set the default Django settings module for the 'celery' program. -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "messages.settings") -os.environ.setdefault("DJANGO_CONFIGURATION", "Development") - -install(check_options=True) - -# Must be imported after install() -from django.conf import settings # pylint: disable=wrong-import-position - -app = Celery("messages") - -# Using a string here means the worker doesn't have to serialize -# the configuration object to child processes. -# - namespace='CELERY' means all celery-related configuration keys -# should have a `CELERY_` prefix. -app.config_from_object("django.conf:settings", namespace="CELERY") - -# Load task modules from all registered Django apps. -app.autodiscover_tasks() - -# Configure beat schedule -# This can be disabled manually, for example when pushing the application for the first time -# to a PaaS service when no migration was applied yet. -if not settings.DISABLE_CELERY_BEAT_SCHEDULE: - app.conf.beat_schedule = { - "retry-pending-messages": { - "task": "core.mda.tasks.retry_messages_task", - "schedule": 300.0, # Every 5 minutes (300 seconds) - "options": {"queue": "default"}, - }, - "selfcheck": { - "task": "core.mda.tasks.selfcheck_task", - "schedule": settings.MESSAGES_SELFCHECK_INTERVAL, - "options": {"queue": "default"}, - }, - } diff --git a/src/backend/messages/settings.py b/src/backend/messages/settings.py index e56a9f29c..f96f2762a 100755 --- a/src/backend/messages/settings.py +++ b/src/backend/messages/settings.py @@ -441,8 +441,8 @@ class Base(Configuration): "drf_spectacular", # Third party apps "corsheaders", - "django_celery_beat", - "django_celery_results", + "django_dramatiq", + "dramatiq_crontab", "django_filters", "rest_framework", "parler", @@ -528,19 +528,62 @@ class Base(Configuration): None, environ_name="FRONTEND_THEME", environ_prefix=None ) - # Celery - CELERY_BROKER_URL = values.Value( - "redis://redis:6379", environ_name="CELERY_BROKER_URL", environ_prefix=None - ) - CELERY_RESULT_BACKEND = "django-db" - CELERY_CACHE_BACKEND = "django-cache" - CELERY_BROKER_TRANSPORT_OPTIONS = values.DictValue({}) - CELERY_RESULT_EXTENDED = True - CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 * 30 # 30 days - CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler" + # Dramatiq + DRAMATIQ_BROKER = { + "BROKER": "dramatiq.brokers.redis.RedisBroker", + "OPTIONS": { + "url": values.Value( + "redis://redis:6379", + environ_name="DRAMATIQ_BROKER_URL", + environ_prefix=None, + ), + }, + "MIDDLEWARE": [ + "dramatiq.middleware.Prometheus", + "dramatiq.middleware.AgeLimit", + "dramatiq.middleware.TimeLimit", + "dramatiq.middleware.Callbacks", + "dramatiq.middleware.Retries", + "dramatiq.middleware.CurrentMessage", + # "dramatiq.results.middleware.Results", + "django_dramatiq.middleware.DbConnectionsMiddleware", + "django_dramatiq.middleware.AdminMiddleware", + ], + } + + DRAMATIQ_RESULT_BACKEND = { + "BACKEND": "dramatiq.results.backends.redis.RedisBackend", + "BACKEND_OPTIONS": { + "url": values.Value( + "redis://redis:6379", + environ_name="DRAMATIQ_RESULT_BACKEND", + environ_prefix=None, + ), + }, + "MIDDLEWARE_OPTIONS": { + "result_ttl": 7 * 24 * 60 * 60 # 7 days + }, + } + + # Defines which database should be used to persist Task objects when the + # AdminMiddleware is enabled. The default value is "default". + DRAMATIQ_TASKS_DATABASE = "default" + + # Dramatiq Crontab configuration + DRAMATIQ_CRONTAB = { + "REDIS_URL": values.Value( + "redis://redis:6379/0", + environ_name="DRAMATIQ_CRONTAB_REDIS_URL", + environ_prefix=None, + ), + } - DISABLE_CELERY_BEAT_SCHEDULE = values.BooleanValue( - default=False, environ_name="DISABLE_CELERY_BEAT_SCHEDULE", environ_prefix=None + DRAMATIQ_AUTODISCOVER_MODULES = [ + "tasks" + ] # services.search.tasks", "services.importer.tasks"] + + DISABLE_DRAMATIQ_SCHEDULE = values.BooleanValue( + default=False, environ_name="DISABLE_DRAMATIQ_SCHEDULE", environ_prefix=None ) # Session @@ -900,7 +943,21 @@ class DevelopmentMinimal(Development): Development environment settings with minimal dependencies """ - CELERY_TASK_ALWAYS_EAGER = True + DRAMATIQ_BROKER = { + "BROKER": "dramatiq.brokers.stub.StubBroker", + "OPTIONS": {}, + "MIDDLEWARE": [ + "dramatiq.middleware.AgeLimit", + "dramatiq.middleware.TimeLimit", + "dramatiq.middleware.Callbacks", + "dramatiq.middleware.Retries", + "dramatiq.middleware.CurrentMessage", + "dramatiq.middleware.Result", + "django_dramatiq.middleware.DbConnectionsMiddleware", + "django_dramatiq.middleware.AdminMiddleware", + ], + } + OPENSEARCH_INDEX_THREADS = False CACHES = { "default": { @@ -920,7 +977,20 @@ class Test(Base): IDENTITY_PROVIDER = None - CELERY_TASK_ALWAYS_EAGER = values.BooleanValue(True) + DRAMATIQ_BROKER = { + "BROKER": "dramatiq.brokers.stub.StubBroker", + "OPTIONS": {}, + "MIDDLEWARE": [ + "dramatiq.middleware.AgeLimit", + "dramatiq.middleware.TimeLimit", + "dramatiq.middleware.Callbacks", + "dramatiq.middleware.Retries", + "dramatiq.middleware.CurrentMessage", + # "dramatiq.results.middleware.Results", + "django_dramatiq.middleware.DbConnectionsMiddleware", + "django_dramatiq.middleware.AdminMiddleware", + ], + } AWS_S3_DOMAIN_REPLACE = None diff --git a/src/backend/messages/urls.py b/src/backend/messages/urls.py index 4b6dc5581..2871ac3fa 100644 --- a/src/backend/messages/urls.py +++ b/src/backend/messages/urls.py @@ -2,7 +2,6 @@ from django.conf import settings from django.conf.urls.static import static -from django.contrib import admin from django.contrib.staticfiles.urls import staticfiles_urlpatterns from django.http import HttpResponse from django.urls import include, path, re_path @@ -13,8 +12,10 @@ SpectacularSwaggerView, ) +from core.admin import admin_site + urlpatterns = [ - path(settings.ADMIN_URL, admin.site.urls), + path(settings.ADMIN_URL, admin_site.urls), path("", include("core.urls")), path( "healthz/", diff --git a/src/backend/poetry.lock b/src/backend/poetry.lock index f53a9c846..916b6ee43 100644 --- a/src/backend/poetry.lock +++ b/src/backend/poetry.lock @@ -16,9 +16,10 @@ files = [ name = "amqp" version = "5.3.1" description = "Low-level AMQP client for Python (fork of amqplib)." -optional = false +optional = true python-versions = ">=3.6" groups = ["main"] +markers = "extra == \"dev\"" files = [ {file = "amqp-5.3.1-py3-none-any.whl", hash = "sha256:43b3319e1b4e7d1251833a93d672b4af1e40f3d632d479b98661a95f117880a2"}, {file = "amqp-5.3.1.tar.gz", hash = "sha256:cddc00c725449522023bad949f70fff7b48f0b1ade74d170a6f10ab044739432"}, @@ -58,6 +59,34 @@ sniffio = ">=1.1" [package.extras] trio = ["trio (>=0.26.1)"] +[[package]] +name = "apscheduler" +version = "3.11.0" +description = "In-process task scheduler with Cron-like capabilities" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "APScheduler-3.11.0-py3-none-any.whl", hash = "sha256:fc134ca32e50f5eadcc4938e3a4545ab19131435e851abb40b34d63d5141c6da"}, + {file = "apscheduler-3.11.0.tar.gz", hash = "sha256:4c622d250b0955a65d5d0eb91c33e6d43fd879834bf541e0a18661ae60460133"}, +] + +[package.dependencies] +tzlocal = ">=3.0" + +[package.extras] +doc = ["packaging", "sphinx", "sphinx-rtd-theme (>=1.3.0)"] +etcd = ["etcd3", "protobuf (<=3.21.0)"] +gevent = ["gevent"] +mongodb = ["pymongo (>=3.0)"] +redis = ["redis (>=3.0)"] +rethinkdb = ["rethinkdb (>=2.4.0)"] +sqlalchemy = ["sqlalchemy (>=1.4)"] +test = ["APScheduler[etcd,mongodb,redis,rethinkdb,sqlalchemy,tornado,zookeeper]", "PySide6 ; platform_python_implementation == \"CPython\" and python_version < \"3.14\"", "anyio (>=4.5.2)", "gevent ; python_version < \"3.14\"", "pytest", "pytz", "twisted ; python_version < \"3.14\""] +tornado = ["tornado (>=4.3)"] +twisted = ["twisted"] +zookeeper = ["kazoo"] + [[package]] name = "asgiref" version = "3.9.1" @@ -122,9 +151,10 @@ tests-mypy = ["mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" a name = "billiard" version = "4.2.1" description = "Python multiprocessing fork with improvements and bugfixes" -optional = false +optional = true python-versions = ">=3.7" groups = ["main"] +markers = "extra == \"dev\"" files = [ {file = "billiard-4.2.1-py3-none-any.whl", hash = "sha256:40b59a4ac8806ba2c2369ea98d876bc6108b051c227baffd928c644d15d8f3cb"}, {file = "billiard-4.2.1.tar.gz", hash = "sha256:12b641b0c539073fc8d3f5b8b7be998956665c4233c7c1fcd66a7e677c4fb36f"}, @@ -216,9 +246,10 @@ redis = ["redis (>=2.10.5)"] name = "celery" version = "5.5.2" description = "Distributed Task Queue." -optional = false +optional = true python-versions = ">=3.8" groups = ["main"] +markers = "extra == \"dev\"" files = [ {file = "celery-5.5.2-py3-none-any.whl", hash = "sha256:54425a067afdc88b57cd8d94ed4af2ffaf13ab8c7680041ac2c4ac44357bdf4c"}, {file = "celery-5.5.2.tar.gz", hash = "sha256:4d6930f354f9d29295425d7a37261245c74a32807c45d764bedc286afd0e724e"}, @@ -232,7 +263,6 @@ click-plugins = ">=1.1.1" click-repl = ">=0.2.0" kombu = ">=5.5.2,<5.6" python-dateutil = ">=2.8.2" -redis = {version = ">=4.5.2,<4.5.5 || >4.5.5,<6.0.0", optional = true, markers = "extra == \"redis\""} vine = ">=5.1.0,<6.0" [package.extras] @@ -468,9 +498,10 @@ files = [ name = "click" version = "8.2.1" description = "Composable command line interface toolkit" -optional = false +optional = true python-versions = ">=3.10" groups = ["main"] +markers = "extra == \"dev\"" files = [ {file = "click-8.2.1-py3-none-any.whl", hash = "sha256:61a3265b914e850b85317d0b3109c7f8cd35a670f963866005d6ef1d5175a12b"}, {file = "click-8.2.1.tar.gz", hash = "sha256:27c491cc05d968d271d5a1db13e3b5a184636d9d930f148c50b038f0d0646202"}, @@ -483,9 +514,10 @@ colorama = {version = "*", markers = "platform_system == \"Windows\""} name = "click-didyoumean" version = "0.3.1" description = "Enables git-like *did-you-mean* feature in click" -optional = false +optional = true python-versions = ">=3.6.2" groups = ["main"] +markers = "extra == \"dev\"" files = [ {file = "click_didyoumean-0.3.1-py3-none-any.whl", hash = "sha256:5c4bb6007cfea5f2fd6583a2fb6701a22a41eb98957e63d0fac41c10e7c3117c"}, {file = "click_didyoumean-0.3.1.tar.gz", hash = "sha256:4f82fdff0dbe64ef8ab2279bd6aa3f6a99c3b28c05aa09cbfc07c9d7fbb5a463"}, @@ -498,9 +530,10 @@ click = ">=7" name = "click-plugins" version = "1.1.1.2" description = "An extension module for click to enable registering CLI commands via setuptools entry-points." -optional = false +optional = true python-versions = "*" groups = ["main"] +markers = "extra == \"dev\"" files = [ {file = "click_plugins-1.1.1.2-py2.py3-none-any.whl", hash = "sha256:008d65743833ffc1f5417bf0e78e8d2c23aab04d9745ba817bd3e71b0feb6aa6"}, {file = "click_plugins-1.1.1.2.tar.gz", hash = "sha256:d7af3984a99d243c131aa1a828331e7630f4a88a9741fd05c927b204bcf92261"}, @@ -516,9 +549,10 @@ dev = ["coveralls", "pytest (>=3.6)", "pytest-cov", "wheel"] name = "click-repl" version = "0.3.0" description = "REPL plugin for Click" -optional = false +optional = true python-versions = ">=3.6" groups = ["main"] +markers = "extra == \"dev\"" files = [ {file = "click-repl-0.3.0.tar.gz", hash = "sha256:17849c23dba3d667247dc4defe1757fff98694e90fe37474f3feebb69ced26a9"}, {file = "click_repl-0.3.0-py3-none-any.whl", hash = "sha256:fb7e06deb8da8de86180a33a9da97ac316751c094c6899382da7feeeeb51b812"}, @@ -646,25 +680,6 @@ files = [ [package.extras] toml = ["tomli ; python_full_version <= \"3.11.0a6\""] -[[package]] -name = "cron-descriptor" -version = "2.0.6" -description = "A Python library that converts cron expressions into human readable strings." -optional = false -python-versions = ">=3.9" -groups = ["main"] -files = [ - {file = "cron_descriptor-2.0.6-py3-none-any.whl", hash = "sha256:3a1c0d837c0e5a32e415f821b36cf758eb92d510e6beff8fbfe4fa16573d93d6"}, - {file = "cron_descriptor-2.0.6.tar.gz", hash = "sha256:e39d2848e1d8913cfb6e3452e701b5eec662ee18bea8cc5aa53ee1a7bb217157"}, -] - -[package.dependencies] -typing_extensions = "*" - -[package.extras] -dev = ["mypy", "polib", "ruff"] -test = ["pytest"] - [[package]] name = "cryptography" version = "45.0.5" @@ -843,42 +858,6 @@ tzdata = {version = "*", markers = "sys_platform == \"win32\""} argon2 = ["argon2-cffi (>=19.1.0)"] bcrypt = ["bcrypt"] -[[package]] -name = "django-celery-beat" -version = "2.8.0" -description = "Database-backed Periodic Tasks." -optional = false -python-versions = ">=3.8" -groups = ["main"] -files = [ - {file = "django_celery_beat-2.8.0-py3-none-any.whl", hash = "sha256:f8fd2e1ffbfa8e570ab9439383b2cd15a6642b347662d0de79c62ba6f68d4b38"}, - {file = "django_celery_beat-2.8.0.tar.gz", hash = "sha256:955bfb3c4b8f1026a8d20144d0da39c941e1eb23acbaee9e12a7e7cc1f74959a"}, -] - -[package.dependencies] -celery = ">=5.2.3,<6.0" -cron-descriptor = ">=1.2.32" -Django = ">=2.2,<6.0" -django-timezone-field = ">=5.0" -python-crontab = ">=2.3.4" -tzdata = "*" - -[[package]] -name = "django-celery-results" -version = "2.6.0" -description = "Celery result backends for Django." -optional = false -python-versions = "*" -groups = ["main"] -files = [ - {file = "django_celery_results-2.6.0-py3-none-any.whl", hash = "sha256:b9ccdca2695b98c7cbbb8dea742311ba9a92773d71d7b4944a676e69a7df1c73"}, - {file = "django_celery_results-2.6.0.tar.gz", hash = "sha256:9abcd836ae6b61063779244d8887a88fe80bbfaba143df36d3cb07034671277c"}, -] - -[package.dependencies] -celery = ">=5.2.7,<6.0" -Django = ">=3.2.25" - [[package]] name = "django-configurations" version = "2.5.1" @@ -939,6 +918,25 @@ maintainer = ["django", "zest.releaser[recommended]"] pyuca = ["pyuca"] test = ["djangorestframework", "graphene-django", "pytest", "pytest-cov", "pytest-django"] +[[package]] +name = "django-dramatiq" +version = "0.14.0" +description = "A Django app for Dramatiq." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "django_dramatiq-0.14.0-py3-none-any.whl", hash = "sha256:78da4a5cccecf7aa099bf66a6d706269f2d0541056a4e9cdc1e54a043f38f4e1"}, + {file = "django_dramatiq-0.14.0.tar.gz", hash = "sha256:dd957fa3c3d5106830ce90f10f72a8f07249643c5f35ccfaee1b9a79f7f5d6dd"}, +] + +[package.dependencies] +django = ">=4.2" +dramatiq = ">=1.11" + +[package.extras] +dev = ["flake8", "flake8-quotes", "isort", "pytest", "pytest-cov", "pytest-django", "twine"] + [[package]] name = "django-extensions" version = "3.2.3" @@ -1131,6 +1129,78 @@ idna = ["idna (>=3.7)"] trio = ["trio (>=0.23)"] wmi = ["wmi (>=1.5.1)"] +[[package]] +name = "dramatiq" +version = "1.18.0" +description = "Background Processing for Python 3." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "dramatiq-1.18.0-py3-none-any.whl", hash = "sha256:d360f608aa3cd06f5db714bfcd23825dc7098bacfee52aca536b0bb0faae3c69"}, + {file = "dramatiq-1.18.0.tar.gz", hash = "sha256:5ea436b6e50dae64d4de04f1eb519ad239a6b1ba6315ba1dce1c0c4c1ebedfaf"}, +] + +[package.dependencies] +prometheus-client = ">=0.2" +redis = {version = ">=2.0,<7.0", optional = true, markers = "extra == \"redis\""} + +[package.extras] +all = ["gevent (>=1.1)", "pika (>=1.0,<2.0)", "pylibmc (>=1.5,<2.0)", "redis (>=2.0,<7.0)", "watchdog (>=4.0)", "watchdog_gevent (>=0.2)"] +dev = ["alabaster", "bumpversion", "flake8", "flake8-bugbear", "flake8-quotes", "gevent (>=1.1)", "hiredis", "isort", "mypy", "pika (>=1.0,<2.0)", "pylibmc (>=1.5,<2.0)", "pytest", "pytest-benchmark[histogram]", "pytest-cov", "redis (>=2.0,<7.0)", "sphinx", "sphinxcontrib-napoleon", "tox", "twine", "watchdog (>=4.0)", "watchdog_gevent (>=0.2)", "wheel"] +gevent = ["gevent (>=1.1)"] +memcached = ["pylibmc (>=1.5,<2.0)"] +rabbitmq = ["pika (>=1.0,<2.0)"] +redis = ["redis (>=2.0,<7.0)"] +watch = ["watchdog (>=4.0)", "watchdog_gevent (>=0.2)"] + +[[package]] +name = "dramatiq-crontab" +version = "1.0.12" +description = "Cron style scheduler for asynchronous Dramatiq tasks in Django." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "dramatiq_crontab-1.0.12-py3-none-any.whl", hash = "sha256:417d1da76fd423c02466db6e1091bb089cf76596331951447283fb9d6720ab8c"}, + {file = "dramatiq_crontab-1.0.12.tar.gz", hash = "sha256:6192289cb7fe16aa698cd6a78aa86e40ae4fb1fe043a55a028cfbf3c2299c8dd"}, +] + +[package.dependencies] +apscheduler = "*" +django = "*" +dramatiq = "*" +sentry-sdk = {version = "*", optional = true, markers = "extra == \"sentry\""} + +[package.extras] +lint = ["black (==25.1.0)", "ruff (==0.12.7)"] +redis = ["redis"] +sentry = ["sentry-sdk"] +test = ["backports.zoneinfo ; python_version < \"3.9\"", "dramatiq", "pytest", "pytest-cov", "pytest-django"] + +[[package]] +name = "dramatiq_dashboard" +version = "0.4.0" +description = "A dashboard for Dramatiq (Redis-only!)." +optional = false +python-versions = ">=3.6" +groups = ["main"] +files = [] +develop = false + +[package.dependencies] +dramatiq = {version = ">=1.6,<2.0", extras = ["redis"]} +jinja2 = ">=2" + +[package.extras] +dev = ["alabaster", "bumpversion", "flake8", "flake8-bugbear", "flake8-quotes", "hiredis", "isort", "pytest", "pytest-benchmark[histogram]", "pytest-cov", "sphinx (<1.8)", "sphinxcontrib-napoleon", "tox", "twine", "wheel"] + +[package.source] +type = "git" +url = "https://github.com/dbowring/dramatiq_dashboard" +reference = "128ee38e2b5efc36c4ae964a934cffc41baaf896" +resolved_reference = "128ee38e2b5efc36c4ae964a934cffc41baaf896" + [[package]] name = "drf-spectacular" version = "0.28.0" @@ -1464,6 +1534,24 @@ files = [ colors = ["colorama"] plugins = ["setuptools"] +[[package]] +name = "jinja2" +version = "3.1.6" +description = "A very fast and expressive template engine." +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67"}, + {file = "jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d"}, +] + +[package.dependencies] +MarkupSafe = ">=2.0" + +[package.extras] +i18n = ["Babel (>=2.7)"] + [[package]] name = "jiter" version = "0.10.0" @@ -1638,9 +1726,10 @@ typing-extensions = ">=4.5.0" name = "kombu" version = "5.5.4" description = "Messaging library for Python." -optional = false +optional = true python-versions = ">=3.8" groups = ["main"] +markers = "extra == \"dev\"" files = [ {file = "kombu-5.5.4-py3-none-any.whl", hash = "sha256:a12ed0557c238897d8e518f1d1fdf84bd1516c5e305af2dacd85c2015115feb8"}, {file = "kombu-5.5.4.tar.gz", hash = "sha256:886600168275ebeada93b888e831352fe578168342f0d1d5833d88ba0d847363"}, @@ -1726,6 +1815,105 @@ profiling = ["gprof2dot"] rtd = ["ipykernel", "jupyter_sphinx", "mdit-py-plugins (>=0.5.0)", "myst-parser", "pyyaml", "sphinx", "sphinx-book-theme (>=1.0,<2.0)", "sphinx-copybutton", "sphinx-design"] testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions", "requests"] +[[package]] +name = "markupsafe" +version = "3.0.3" +description = "Safely add untrusted strings to HTML/XML markup." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "markupsafe-3.0.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:2f981d352f04553a7171b8e44369f2af4055f888dfb147d55e42d29e29e74559"}, + {file = "markupsafe-3.0.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e1c1493fb6e50ab01d20a22826e57520f1284df32f2d8601fdd90b6304601419"}, + {file = "markupsafe-3.0.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1ba88449deb3de88bd40044603fafffb7bc2b055d626a330323a9ed736661695"}, + {file = "markupsafe-3.0.3-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f42d0984e947b8adf7dd6dde396e720934d12c506ce84eea8476409563607591"}, + {file = "markupsafe-3.0.3-cp310-cp310-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:c0c0b3ade1c0b13b936d7970b1d37a57acde9199dc2aecc4c336773e1d86049c"}, + {file = "markupsafe-3.0.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:0303439a41979d9e74d18ff5e2dd8c43ed6c6001fd40e5bf2e43f7bd9bbc523f"}, + {file = "markupsafe-3.0.3-cp310-cp310-musllinux_1_2_riscv64.whl", hash = "sha256:d2ee202e79d8ed691ceebae8e0486bd9a2cd4794cec4824e1c99b6f5009502f6"}, + {file = "markupsafe-3.0.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:177b5253b2834fe3678cb4a5f0059808258584c559193998be2601324fdeafb1"}, + {file = "markupsafe-3.0.3-cp310-cp310-win32.whl", hash = "sha256:2a15a08b17dd94c53a1da0438822d70ebcd13f8c3a95abe3a9ef9f11a94830aa"}, + {file = "markupsafe-3.0.3-cp310-cp310-win_amd64.whl", hash = "sha256:c4ffb7ebf07cfe8931028e3e4c85f0357459a3f9f9490886198848f4fa002ec8"}, + {file = "markupsafe-3.0.3-cp310-cp310-win_arm64.whl", hash = "sha256:e2103a929dfa2fcaf9bb4e7c091983a49c9ac3b19c9061b6d5427dd7d14d81a1"}, + {file = "markupsafe-3.0.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:1cc7ea17a6824959616c525620e387f6dd30fec8cb44f649e31712db02123dad"}, + {file = "markupsafe-3.0.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:4bd4cd07944443f5a265608cc6aab442e4f74dff8088b0dfc8238647b8f6ae9a"}, + {file = "markupsafe-3.0.3-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6b5420a1d9450023228968e7e6a9ce57f65d148ab56d2313fcd589eee96a7a50"}, + {file = "markupsafe-3.0.3-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0bf2a864d67e76e5c9a34dc26ec616a66b9888e25e7b9460e1c76d3293bd9dbf"}, + {file = "markupsafe-3.0.3-cp311-cp311-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:bc51efed119bc9cfdf792cdeaa4d67e8f6fcccab66ed4bfdd6bde3e59bfcbb2f"}, + {file = "markupsafe-3.0.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:068f375c472b3e7acbe2d5318dea141359e6900156b5b2ba06a30b169086b91a"}, + {file = "markupsafe-3.0.3-cp311-cp311-musllinux_1_2_riscv64.whl", hash = "sha256:7be7b61bb172e1ed687f1754f8e7484f1c8019780f6f6b0786e76bb01c2ae115"}, + {file = "markupsafe-3.0.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:f9e130248f4462aaa8e2552d547f36ddadbeaa573879158d721bbd33dfe4743a"}, + {file = "markupsafe-3.0.3-cp311-cp311-win32.whl", hash = "sha256:0db14f5dafddbb6d9208827849fad01f1a2609380add406671a26386cdf15a19"}, + {file = "markupsafe-3.0.3-cp311-cp311-win_amd64.whl", hash = "sha256:de8a88e63464af587c950061a5e6a67d3632e36df62b986892331d4620a35c01"}, + {file = "markupsafe-3.0.3-cp311-cp311-win_arm64.whl", hash = "sha256:3b562dd9e9ea93f13d53989d23a7e775fdfd1066c33494ff43f5418bc8c58a5c"}, + {file = "markupsafe-3.0.3-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:d53197da72cc091b024dd97249dfc7794d6a56530370992a5e1a08983ad9230e"}, + {file = "markupsafe-3.0.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1872df69a4de6aead3491198eaf13810b565bdbeec3ae2dc8780f14458ec73ce"}, + {file = "markupsafe-3.0.3-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3a7e8ae81ae39e62a41ec302f972ba6ae23a5c5396c8e60113e9066ef893da0d"}, + {file = "markupsafe-3.0.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:d6dd0be5b5b189d31db7cda48b91d7e0a9795f31430b7f271219ab30f1d3ac9d"}, + {file = "markupsafe-3.0.3-cp312-cp312-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:94c6f0bb423f739146aec64595853541634bde58b2135f27f61c1ffd1cd4d16a"}, + {file = "markupsafe-3.0.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:be8813b57049a7dc738189df53d69395eba14fb99345e0a5994914a3864c8a4b"}, + {file = "markupsafe-3.0.3-cp312-cp312-musllinux_1_2_riscv64.whl", hash = "sha256:83891d0e9fb81a825d9a6d61e3f07550ca70a076484292a70fde82c4b807286f"}, + {file = "markupsafe-3.0.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:77f0643abe7495da77fb436f50f8dab76dbc6e5fd25d39589a0f1fe6548bfa2b"}, + {file = "markupsafe-3.0.3-cp312-cp312-win32.whl", hash = "sha256:d88b440e37a16e651bda4c7c2b930eb586fd15ca7406cb39e211fcff3bf3017d"}, + {file = "markupsafe-3.0.3-cp312-cp312-win_amd64.whl", hash = "sha256:26a5784ded40c9e318cfc2bdb30fe164bdb8665ded9cd64d500a34fb42067b1c"}, + {file = "markupsafe-3.0.3-cp312-cp312-win_arm64.whl", hash = "sha256:35add3b638a5d900e807944a078b51922212fb3dedb01633a8defc4b01a3c85f"}, + {file = "markupsafe-3.0.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:e1cf1972137e83c5d4c136c43ced9ac51d0e124706ee1c8aa8532c1287fa8795"}, + {file = "markupsafe-3.0.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:116bb52f642a37c115f517494ea5feb03889e04df47eeff5b130b1808ce7c219"}, + {file = "markupsafe-3.0.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:133a43e73a802c5562be9bbcd03d090aa5a1fe899db609c29e8c8d815c5f6de6"}, + {file = "markupsafe-3.0.3-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ccfcd093f13f0f0b7fdd0f198b90053bf7b2f02a3927a30e63f3ccc9df56b676"}, + {file = "markupsafe-3.0.3-cp313-cp313-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:509fa21c6deb7a7a273d629cf5ec029bc209d1a51178615ddf718f5918992ab9"}, + {file = "markupsafe-3.0.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:a4afe79fb3de0b7097d81da19090f4df4f8d3a2b3adaa8764138aac2e44f3af1"}, + {file = "markupsafe-3.0.3-cp313-cp313-musllinux_1_2_riscv64.whl", hash = "sha256:795e7751525cae078558e679d646ae45574b47ed6e7771863fcc079a6171a0fc"}, + {file = "markupsafe-3.0.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:8485f406a96febb5140bfeca44a73e3ce5116b2501ac54fe953e488fb1d03b12"}, + {file = "markupsafe-3.0.3-cp313-cp313-win32.whl", hash = "sha256:bdd37121970bfd8be76c5fb069c7751683bdf373db1ed6c010162b2a130248ed"}, + {file = "markupsafe-3.0.3-cp313-cp313-win_amd64.whl", hash = "sha256:9a1abfdc021a164803f4d485104931fb8f8c1efd55bc6b748d2f5774e78b62c5"}, + {file = "markupsafe-3.0.3-cp313-cp313-win_arm64.whl", hash = "sha256:7e68f88e5b8799aa49c85cd116c932a1ac15caaa3f5db09087854d218359e485"}, + {file = "markupsafe-3.0.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:218551f6df4868a8d527e3062d0fb968682fe92054e89978594c28e642c43a73"}, + {file = "markupsafe-3.0.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:3524b778fe5cfb3452a09d31e7b5adefeea8c5be1d43c4f810ba09f2ceb29d37"}, + {file = "markupsafe-3.0.3-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:4e885a3d1efa2eadc93c894a21770e4bc67899e3543680313b09f139e149ab19"}, + {file = "markupsafe-3.0.3-cp313-cp313t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8709b08f4a89aa7586de0aadc8da56180242ee0ada3999749b183aa23df95025"}, + {file = "markupsafe-3.0.3-cp313-cp313t-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:b8512a91625c9b3da6f127803b166b629725e68af71f8184ae7e7d54686a56d6"}, + {file = "markupsafe-3.0.3-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:9b79b7a16f7fedff2495d684f2b59b0457c3b493778c9eed31111be64d58279f"}, + {file = "markupsafe-3.0.3-cp313-cp313t-musllinux_1_2_riscv64.whl", hash = "sha256:12c63dfb4a98206f045aa9563db46507995f7ef6d83b2f68eda65c307c6829eb"}, + {file = "markupsafe-3.0.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:8f71bc33915be5186016f675cd83a1e08523649b0e33efdb898db577ef5bb009"}, + {file = "markupsafe-3.0.3-cp313-cp313t-win32.whl", hash = "sha256:69c0b73548bc525c8cb9a251cddf1931d1db4d2258e9599c28c07ef3580ef354"}, + {file = "markupsafe-3.0.3-cp313-cp313t-win_amd64.whl", hash = "sha256:1b4b79e8ebf6b55351f0d91fe80f893b4743f104bff22e90697db1590e47a218"}, + {file = "markupsafe-3.0.3-cp313-cp313t-win_arm64.whl", hash = "sha256:ad2cf8aa28b8c020ab2fc8287b0f823d0a7d8630784c31e9ee5edea20f406287"}, + {file = "markupsafe-3.0.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:eaa9599de571d72e2daf60164784109f19978b327a3910d3e9de8c97b5b70cfe"}, + {file = "markupsafe-3.0.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:c47a551199eb8eb2121d4f0f15ae0f923d31350ab9280078d1e5f12b249e0026"}, + {file = "markupsafe-3.0.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f34c41761022dd093b4b6896d4810782ffbabe30f2d443ff5f083e0cbbb8c737"}, + {file = "markupsafe-3.0.3-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:457a69a9577064c05a97c41f4e65148652db078a3a509039e64d3467b9e7ef97"}, + {file = "markupsafe-3.0.3-cp314-cp314-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:e8afc3f2ccfa24215f8cb28dcf43f0113ac3c37c2f0f0806d8c70e4228c5cf4d"}, + {file = "markupsafe-3.0.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:ec15a59cf5af7be74194f7ab02d0f59a62bdcf1a537677ce67a2537c9b87fcda"}, + {file = "markupsafe-3.0.3-cp314-cp314-musllinux_1_2_riscv64.whl", hash = "sha256:0eb9ff8191e8498cca014656ae6b8d61f39da5f95b488805da4bb029cccbfbaf"}, + {file = "markupsafe-3.0.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:2713baf880df847f2bece4230d4d094280f4e67b1e813eec43b4c0e144a34ffe"}, + {file = "markupsafe-3.0.3-cp314-cp314-win32.whl", hash = "sha256:729586769a26dbceff69f7a7dbbf59ab6572b99d94576a5592625d5b411576b9"}, + {file = "markupsafe-3.0.3-cp314-cp314-win_amd64.whl", hash = "sha256:bdc919ead48f234740ad807933cdf545180bfbe9342c2bb451556db2ed958581"}, + {file = "markupsafe-3.0.3-cp314-cp314-win_arm64.whl", hash = "sha256:5a7d5dc5140555cf21a6fefbdbf8723f06fcd2f63ef108f2854de715e4422cb4"}, + {file = "markupsafe-3.0.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:1353ef0c1b138e1907ae78e2f6c63ff67501122006b0f9abad68fda5f4ffc6ab"}, + {file = "markupsafe-3.0.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:1085e7fbddd3be5f89cc898938f42c0b3c711fdcb37d75221de2666af647c175"}, + {file = "markupsafe-3.0.3-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1b52b4fb9df4eb9ae465f8d0c228a00624de2334f216f178a995ccdcf82c4634"}, + {file = "markupsafe-3.0.3-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:fed51ac40f757d41b7c48425901843666a6677e3e8eb0abcff09e4ba6e664f50"}, + {file = "markupsafe-3.0.3-cp314-cp314t-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:f190daf01f13c72eac4efd5c430a8de82489d9cff23c364c3ea822545032993e"}, + {file = "markupsafe-3.0.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:e56b7d45a839a697b5eb268c82a71bd8c7f6c94d6fd50c3d577fa39a9f1409f5"}, + {file = "markupsafe-3.0.3-cp314-cp314t-musllinux_1_2_riscv64.whl", hash = "sha256:f3e98bb3798ead92273dc0e5fd0f31ade220f59a266ffd8a4f6065e0a3ce0523"}, + {file = "markupsafe-3.0.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:5678211cb9333a6468fb8d8be0305520aa073f50d17f089b5b4b477ea6e67fdc"}, + {file = "markupsafe-3.0.3-cp314-cp314t-win32.whl", hash = "sha256:915c04ba3851909ce68ccc2b8e2cd691618c4dc4c4232fb7982bca3f41fd8c3d"}, + {file = "markupsafe-3.0.3-cp314-cp314t-win_amd64.whl", hash = "sha256:4faffd047e07c38848ce017e8725090413cd80cbc23d86e55c587bf979e579c9"}, + {file = "markupsafe-3.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:32001d6a8fc98c8cb5c947787c5d08b0a50663d139f1305bac5885d98d9b40fa"}, + {file = "markupsafe-3.0.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:15d939a21d546304880945ca1ecb8a039db6b4dc49b2c5a400387cdae6a62e26"}, + {file = "markupsafe-3.0.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f71a396b3bf33ecaa1626c255855702aca4d3d9fea5e051b41ac59a9c1c41edc"}, + {file = "markupsafe-3.0.3-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0f4b68347f8c5eab4a13419215bdfd7f8c9b19f2b25520968adfad23eb0ce60c"}, + {file = "markupsafe-3.0.3-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e8fc20152abba6b83724d7ff268c249fa196d8259ff481f3b1476383f8f24e42"}, + {file = "markupsafe-3.0.3-cp39-cp39-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:949b8d66bc381ee8b007cd945914c721d9aba8e27f71959d750a46f7c282b20b"}, + {file = "markupsafe-3.0.3-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:3537e01efc9d4dccdf77221fb1cb3b8e1a38d5428920e0657ce299b20324d758"}, + {file = "markupsafe-3.0.3-cp39-cp39-musllinux_1_2_riscv64.whl", hash = "sha256:591ae9f2a647529ca990bc681daebdd52c8791ff06c2bfa05b65163e28102ef2"}, + {file = "markupsafe-3.0.3-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:a320721ab5a1aba0a233739394eb907f8c8da5c98c9181d1161e77a0c8e36f2d"}, + {file = "markupsafe-3.0.3-cp39-cp39-win32.whl", hash = "sha256:df2449253ef108a379b8b5d6b43f4b1a8e81a061d6537becd5582fba5f9196d7"}, + {file = "markupsafe-3.0.3-cp39-cp39-win_amd64.whl", hash = "sha256:7c3fb7d25180895632e5d3148dbdc29ea38ccb7fd210aa27acbd1201a1902c6e"}, + {file = "markupsafe-3.0.3-cp39-cp39-win_arm64.whl", hash = "sha256:38664109c14ffc9e7437e86b4dceb442b0096dfe3541d7864d9cbe1da4cf36c8"}, + {file = "markupsafe-3.0.3.tar.gz", hash = "sha256:722695808f4b6457b320fdc131280796bdceb04ab50fe1795cd540799ebe1698"}, +] + [[package]] name = "mccabe" version = "0.7.0" @@ -2115,9 +2303,10 @@ twisted = ["twisted"] name = "prompt-toolkit" version = "3.0.52" description = "Library for building powerful interactive command lines in Python" -optional = false +optional = true python-versions = ">=3.8" groups = ["main"] +markers = "extra == \"dev\"" files = [ {file = "prompt_toolkit-3.0.52-py3-none-any.whl", hash = "sha256:9aac639a3bbd33284347de5ad8d68ecc044b91a762dc39b7c21095fcd6a19955"}, {file = "prompt_toolkit-3.0.52.tar.gz", hash = "sha256:28cde192929c8e7321de85de1ddbe736f1375148b02f2e17edd840042b1be855"}, @@ -2644,22 +2833,6 @@ psutil = ["psutil (>=3.0)"] setproctitle = ["setproctitle"] testing = ["filelock"] -[[package]] -name = "python-crontab" -version = "3.3.0" -description = "Python Crontab API" -optional = false -python-versions = "*" -groups = ["main"] -files = [ - {file = "python_crontab-3.3.0-py3-none-any.whl", hash = "sha256:739a778b1a771379b75654e53fd4df58e5c63a9279a63b5dfe44c0fcc3ee7884"}, - {file = "python_crontab-3.3.0.tar.gz", hash = "sha256:007c8aee68dddf3e04ec4dce0fac124b93bd68be7470fc95d2a9617a15de291b"}, -] - -[package.extras] -cron-description = ["cron-descriptor"] -cron-schedule = ["croniter"] - [[package]] name = "python-dateutil" version = "2.9.0.post0" @@ -3540,6 +3713,24 @@ files = [ {file = "tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9"}, ] +[[package]] +name = "tzlocal" +version = "5.3.1" +description = "tzinfo object for the local timezone" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d"}, + {file = "tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd"}, +] + +[package.dependencies] +tzdata = {version = "*", markers = "platform_system == \"Windows\""} + +[package.extras] +devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3)", "zest.releaser"] + [[package]] name = "uritemplate" version = "4.2.0" @@ -3589,9 +3780,10 @@ zstd = ["zstandard (>=0.18.0)"] name = "vine" version = "5.1.0" description = "Python promises." -optional = false +optional = true python-versions = ">=3.6" groups = ["main"] +markers = "extra == \"dev\"" files = [ {file = "vine-5.1.0-py3-none-any.whl", hash = "sha256:40fdf3c48b2cfe1c38a49e9ae2da6fda88e4794c810050a728bd7413811fb1dc"}, {file = "vine-5.1.0.tar.gz", hash = "sha256:8b62e981d35c41049211cf62a0a1242d8c1ee9bd15bb196ce38aefd6799e61e0"}, @@ -3601,9 +3793,10 @@ files = [ name = "wcwidth" version = "0.2.13" description = "Measures the displayed width of unicode strings in a terminal" -optional = false +optional = true python-versions = "*" groups = ["main"] +markers = "extra == \"dev\"" files = [ {file = "wcwidth-0.2.13-py2.py3-none-any.whl", hash = "sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859"}, {file = "wcwidth-0.2.13.tar.gz", hash = "sha256:72ea0c06399eb286d978fdedb6923a9eb47e1c486ce63e9b4e64fc18303972b5"}, @@ -3649,4 +3842,4 @@ dev = ["django-extensions", "drf-spectacular-sidecar", "flower", "pip-audit", "p [metadata] lock-version = "2.1" python-versions = ">=3.13,<4.0" -content-hash = "b02eb7943a2f100953cbb14c779c0d3ac8c65e26bbbc4c5bf8e48f401680ed7e" +content-hash = "dd6f59bb1028c7d2f4122007674da97242251b060608296f1b2712d7a41f9f44" diff --git a/src/backend/pyproject.toml b/src/backend/pyproject.toml index e14893d61..fbf9068a9 100644 --- a/src/backend/pyproject.toml +++ b/src/backend/pyproject.toml @@ -5,7 +5,7 @@ [project] name = "messages-backend" version = "0.0.1" -authors = [{ "name" = "ANCT", "email" = "suiteterritoriale@anct.gouv.fr" }] +authors = [{ "name" = "ANCT", "email" = "contact@suite.anct.gouv.fr" }] classifiers = [ "Development Status :: 5 - Production/Stable", "Framework :: Django", @@ -26,12 +26,13 @@ requires-python = ">=3.13,<4.0" dependencies = [ "boto3==1.40.43", "botocore==1.40.43", - "celery[redis]==5.5.2", "cryptography==45.0.5", "dj-database-url==2.3.0", "django==5.1.13", - "django-celery-beat==2.8.0", - "django-celery-results==2.6.0", + "django-dramatiq==0.14.0", + "dramatiq[redis]==1.18.0", + "dramatiq-dashboard@git+https://github.com/dbowring/dramatiq_dashboard@128ee38e2b5efc36c4ae964a934cffc41baaf896", + "dramatiq-crontab[sentry]==1.0.12", "django-configurations==2.5.1", "django-cors-headers==4.6.0", "django-countries==7.6.1",