Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions src/backend/core/api/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -4133,6 +4133,91 @@
}
}
},
"/api/v1.0/send/cancel/": {
"post": {
"operationId": "send_cancel_create",
"description": "\n Cancel a queued send task and keep the message as a draft.\n\n This endpoint revokes the Celery task responsible for sending the message\n and ensures the message remains in draft state.\n ",
"tags": [
"messages"
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/CancelSendRequest"
},
"examples": {
"CancelSend": {
"value": {
"taskId": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"messageId": "123e4567-e89b-12d3-a456-426614174000"
},
"summary": "Cancel Send"
}
}
},
"multipart/form-data": {
"schema": {
"$ref": "#/components/schemas/CancelSendRequest"
}
}
},
"required": true
},
"security": [
{
"cookieAuth": []
}
],
"responses": {
"200": {
"content": {
"application/json": {
"schema": {
"type": "object",
"additionalProperties": {},
"description": "Unspecified response body"
},
"examples": {
"CancelSend": {
"value": {
"taskId": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"messageId": "123e4567-e89b-12d3-a456-426614174000"
},
"summary": "Cancel Send"
}
}
}
},
"description": ""
},
"400": {
"content": {
"application/json": {
"schema": {
"type": "object",
"additionalProperties": {},
"description": "Unspecified response body"
}
}
},
"description": ""
},
"404": {
"content": {
"application/json": {
"schema": {
"type": "object",
"additionalProperties": {},
"description": "Unspecified response body"
}
}
},
"description": ""
}
}
}
},
"/api/v1.0/tasks/{task_id}/": {
"get": {
"operationId": "tasks_retrieve",
Expand Down Expand Up @@ -5119,6 +5204,26 @@
"type"
]
},
"CancelSendRequest": {
"type": "object",
"description": "Serializer for canceling send tasks.",
"properties": {
"taskId": {
"type": "string",
"minLength": 1,
"description": "Celery task ID to cancel"
},
"messageId": {
"type": "string",
"format": "uuid",
"description": "Message ID to keep as draft"
}
},
"required": [
"messageId",
"taskId"
]
},
"ChangeFlagRequestRequest": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -7109,6 +7214,12 @@
},
"htmlBody": {
"type": "string"
},
"delay": {
"type": "integer",
"maximum": 30,
"minimum": 0,
"default": 0
}
},
"required": [
Expand Down
3 changes: 2 additions & 1 deletion src/backend/core/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1460,9 +1460,10 @@ class SendMessageSerializer(serializers.Serializer):
archive = serializers.BooleanField(required=False, default=False)
textBody = serializers.CharField(required=False, allow_blank=True)
htmlBody = serializers.CharField(required=False, allow_blank=True)
delay = serializers.IntegerField(required=False, default=0, min_value=0, max_value=30)

class Meta:
fields = ["messageId", "senderId", "archive", "textBody", "htmlBody"]
fields = ["messageId", "senderId", "archive", "textBody", "htmlBody", "delay"]

def create(self, validated_data):
"""This serializer is only used to validate the data, not to create or update."""
Expand Down
119 changes: 119 additions & 0 deletions src/backend/core/api/viewsets/cancel_send.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""API ViewSet for canceling queued send tasks."""

import logging

from drf_spectacular.utils import OpenApiExample, extend_schema
from rest_framework import exceptions as drf_exceptions
from rest_framework import serializers as drf_serializers
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView

from core import models
from core.mda.tasks import celery_app

from .. import permissions

logger = logging.getLogger(__name__)


class CancelSendSerializer(drf_serializers.Serializer):
"""Serializer for canceling send tasks."""

taskId = drf_serializers.CharField(required=True, help_text="Celery task ID to cancel")
messageId = drf_serializers.UUIDField(required=True, help_text="Message ID to keep as draft")


@extend_schema(
tags=["messages"],
request=CancelSendSerializer,
responses={
200: OpenApiExample(
"Success",
value={"detail": "Send cancelled successfully."},
),
400: OpenApiExample(
"Validation Error",
value={"detail": "Invalid request data."},
),
404: OpenApiExample(
"Not Found",
value={"detail": "Message not found."},
),
},
description="""
Cancel a queued send task and keep the message as a draft.

This endpoint revokes the Celery task responsible for sending the message
and ensures the message remains in draft state.
""",
examples=[
OpenApiExample(
"Cancel Send",
value={
"taskId": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"messageId": "123e4567-e89b-12d3-a456-426614174000",
},
),
],
)
class CancelSendView(APIView):
"""Cancel a queued send task."""

permission_classes = [permissions.IsAllowedToAccess]

def post(self, request):
"""Cancel a queued send task."""
serializer = CancelSendSerializer(data=request.data)
serializer.is_valid(raise_exception=True)

task_id = serializer.validated_data.get("taskId")
message_id = serializer.validated_data.get("messageId")

# Ensure message stays as draft
try:
message = models.Message.objects.get(id=message_id)

# Check if already sent - if draft_blob is gone, it's too late
if not message.draft_blob:
raise drf_exceptions.ValidationError(
"Cannot cancel: message has already been sent."
)

Comment on lines +65 to +82
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Missing object-level permission check.

The view fetches the message but never validates that the requesting user has permission to cancel it. Looking at the send view (src/backend/core/api/viewsets/send.py line 103), it calls self.check_object_permissions(request, message) after fetching.

Add permission check after fetching the message:

         try:
             message = models.Message.objects.get(id=message_id)
+            self.check_object_permissions(request, message)
 
             # Check if already sent - if draft_blob is gone, it's too late
             if not message.draft_blob:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def post(self, request):
"""Cancel a queued send task."""
serializer = CancelSendSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
task_id = serializer.validated_data.get("taskId")
message_id = serializer.validated_data.get("messageId")
# Ensure message stays as draft
try:
message = models.Message.objects.get(id=message_id)
# Check if already sent - if draft_blob is gone, it's too late
if not message.draft_blob:
raise drf_exceptions.ValidationError(
"Cannot cancel: message has already been sent."
)
def post(self, request):
"""Cancel a queued send task."""
serializer = CancelSendSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
task_id = serializer.validated_data.get("taskId")
message_id = serializer.validated_data.get("messageId")
# Ensure message stays as draft
try:
message = models.Message.objects.get(id=message_id)
self.check_object_permissions(request, message)
# Check if already sent - if draft_blob is gone, it's too late
if not message.draft_blob:
raise drf_exceptions.ValidationError(
"Cannot cancel: message has already been sent."
)
🤖 Prompt for AI Agents
In src/backend/core/api/viewsets/cancel_send.py around lines 65 to 82, after
fetching the message object you must perform an object-level permission check;
add a call to self.check_object_permissions(request, message) immediately after
message = models.Message.objects.get(id=message_id) so the view enforces the
same permissions as the send view before proceeding to validate draft_blob and
cancel.

celery_app.control.revoke(task_id, terminate=True)
Copy link

@coderabbitai coderabbitai bot Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Task revocation with terminate=True could disrupt email sending.

Using terminate=True sends SIGTERM to the worker, which forcefully kills the task even if it's mid-execution. If the task has already started sending the email (SMTP connection open, partial transmission), this could leave the system in an inconsistent state or cause email corruption.

Consider:

  1. First try revoking without terminate: celery_app.control.revoke(task_id, terminate=False)
  2. Check if the message state has changed (refresh from DB and re-check draft_blob)
  3. Only use terminate=True as a last resort if the message is still in a safe state

Alternatively, design the Celery task to check a cancellation flag at the start of execution and abort if set, avoiding the need for terminate=True.

🤖 Prompt for AI Agents
In src/backend/core/api/viewsets/cancel_send.py around line 84, the code
currently calls celery_app.control.revoke(task_id, terminate=True) which
forcefully kills tasks; update the flow to first call revoke with
terminate=False, then reload the message record from the database and re-check
draft_blob/state to determine whether the send has begun; only if the refreshed
state indicates it is still safe to forcibly stop (or if a
timeout/failed-soft-revoke occurs) call revoke with terminate=True as a last
resort; additionally, modify the task contract to respect a cancellable flag
(check a DB cancellation field at start and abort cleanly) so terminate=True is
rarely needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with coderabbit here, we need to cancel from the task, not from the outside

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sylvinus Understood! You're right — canceling from within the task is much safer and more predictable than external termination.

Here's the recommended approach:

Implementation Strategy

1. Add a cancellation flag to the Message model

Add a field to track cancellation intent:

# In models.py
is_send_cancelled = models.BooleanField(default=False)

2. Modify the cancel_send view

Instead of revoking the task, set the cancellation flag:

# In cancel_send.py
def post(self, request):
    # ... validation ...
    
    message = models.Message.objects.get(id=message_id)
    
    if not message.draft_blob:
        raise drf_exceptions.ValidationError(
            "Cannot cancel: message has already been sent."
        )
    
    # Set cancellation flag instead of revoking
    message.is_send_cancelled = True
    message.save(update_fields=["is_send_cancelled"])
    
    # Optional: still call revoke without terminate for cleanup
    celery_app.control.revoke(task_id, terminate=False)
    
    return Response({"detail": "Send cancelled successfully."})

3. Check the flag at the start of the Celery task

In send_message task (or wrapper), check before any SMTP operations:

# In tasks.py or outbound.py
@celery_app.task
def send_message_task(message_id, ...):
    message = Message.objects.get(id=message_id)
    
    # Check cancellation flag BEFORE any send operations
    if message.is_send_cancelled:
        logger.info(f"Send cancelled for message {message_id}, keeping as draft")
        # Clean up: reset the flag for potential future sends
        message.is_send_cancelled = False
        message.save(update_fields=["is_send_cancelled"])
        return  # Abort cleanly
    
    # Proceed with normal send flow
    send_message(message, ...)

This approach ensures:

  • ✅ No risk of interrupting mid-send
  • ✅ Clean state management
  • ✅ Task checks cancellation before touching SMTP
  • ✅ Predictable behavior with proper logging

Would you like me to help draft the complete implementation with these changes?

logger.info(f"Revoked Celery task {task_id} for message {message_id}")

needs_save = False
update_fields = []

if not message.is_draft:
message.is_draft = True
needs_save = True
update_fields.append("is_draft")

# Remove MIME blob - drafts shouldn't have one
if message.blob:
try:
message.blob.delete()
logger.info(f"Deleted blob {message.blob_id} for cancelled message {message_id}")
except Exception as e:
logger.warning(f"Failed to delete blob for message {message_id}: {e}")

message.blob = None
needs_save = True
update_fields.append("blob")

if needs_save:
message.save(update_fields=update_fields)
message.thread.update_stats()
logger.info(f"Message {message_id} reverted to draft state")
Comment on lines +106 to +109
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential AttributeError if message.thread is None.

Line 108 calls message.thread.update_stats() without verifying that thread is not None. If the foreign key allows null values, this will raise an AttributeError.

Add a null check before calling thread methods:

             if needs_save:
                 message.save(update_fields=update_fields)
-                message.thread.update_stats()
+                if message.thread:
+                    message.thread.update_stats()
                 logger.info(f"Message {message_id} reverted to draft state")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if needs_save:
message.save(update_fields=update_fields)
message.thread.update_stats()
logger.info(f"Message {message_id} reverted to draft state")
if needs_save:
message.save(update_fields=update_fields)
if message.thread:
message.thread.update_stats()
logger.info(f"Message {message_id} reverted to draft state")
🤖 Prompt for AI Agents
In src/backend/core/api/viewsets/cancel_send.py around lines 106 to 109, calling
message.thread.update_stats() can raise AttributeError if message.thread is
None; add a null check before invoking thread methods (for example, if
message.thread is not None: message.thread.update_stats()) so update_stats is
only called when a related thread exists, preserving the existing
save/update_fields and log behavior.

else:
logger.info(f"Message {message_id} was already in draft state")
except models.Message.DoesNotExist as e:
logger.warning(f"Message {message_id} not found, but task {task_id} was revoked")
raise drf_exceptions.NotFound("Message not found.") from e

return Response(
{"detail": "Send cancelled successfully."},
status=status.HTTP_200_OK,
)
30 changes: 24 additions & 6 deletions src/backend/core/api/viewsets/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def post(self, request):
message_id = serializer.validated_data.get("messageId")
sender_id = serializer.validated_data.get("senderId")
must_archive = serializer.validated_data.get("archive", False) is True
delay = serializer.validated_data.get("delay", 0)

try:
mailbox_sender = models.Mailbox.objects.get(id=sender_id)
Expand Down Expand Up @@ -120,14 +121,31 @@ def post(self, request):
)

# Launch async task for sending the message
task = send_message_task.delay(str(message.id), must_archive=must_archive)
if delay > 0:
# For delayed send, keep message as draft until task executes
task = send_message_task.apply_async(
args=[str(message.id)],
kwargs={"must_archive": must_archive},
countdown=delay,
)
else:
# For immediate send, mark as non-draft now (original behavior)
message.is_draft = False
message.save(update_fields=["is_draft"])
task = send_message_task.delay(str(message.id), must_archive=must_archive)

# --- Finalize ---
# Message state should be updated by prepare_outbound_message/send_message
# Refresh from DB to get final state (e.g., is_draft=False)
# Refresh from DB to get final state
message.refresh_from_db()

# Update thread stats after un-drafting
message.thread.update_stats()
# Update thread stats after un-drafting (only if not delayed)
if delay == 0:
message.thread.update_stats()

# Serialize the message to return in response
message_serializer = serializers.MessageSerializer(message)

return Response({"task_id": task.id}, status=status.HTTP_200_OK)
return Response(
{"task_id": task.id, "message": message_serializer.data},
status=status.HTTP_200_OK,
)
26 changes: 14 additions & 12 deletions src/backend/core/mda/outbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,28 +262,20 @@ def prepare_outbound_message(
content_type="message/rfc822",
)

draft_blob = message.draft_blob

# Preserve draft_blob for undo functionality (cleaned up in send_message)
message.blob = blob
message.is_draft = False
message.draft_blob = None
message.created_at = timezone.now()
message.updated_at = timezone.now()
message.save(
update_fields=[
"updated_at",
"blob",
"mime_id",
"is_draft",
"draft_blob",
"created_at",
]
)
message.thread.update_stats()

# Clean up the draft blob and the attachment blobs
if draft_blob:
draft_blob.delete()
for attachment in message.attachments.all():
if attachment.blob:
attachment.blob.delete()
Expand All @@ -298,9 +290,19 @@ def send_message(message: models.Message, force_mta_out: bool = False):
This part is called asynchronously from the celery worker.
"""

# Refuse to send messages that are draft or not senders
if message.is_draft:
raise ValueError("Cannot send a draft message")
message.is_draft = False
message.save(update_fields=["is_draft"])

if message.draft_blob:
try:
message.draft_blob.delete()
message.draft_blob = None
message.save(update_fields=["draft_blob"])
except Exception as e:
logger.warning(f"Failed to cleanup draft_blob for message {message.id}: {e}")

# Refuse to send messages we are not sender of
if not message.is_sender:
raise ValueError("Cannot send a message we are not sender of")

Expand Down Expand Up @@ -470,8 +472,8 @@ def _mark_delivered(
True,
)
finally:
# Always release the lock when done
cache.delete(lock_key)
message.thread.update_stats()


def send_outbound_message(
Expand Down
6 changes: 6 additions & 0 deletions src/backend/core/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from core.api.viewsets.metrics import MailDomainUsersMetricsApiView
from core.api.viewsets.placeholder import PlaceholderView
from core.api.viewsets.send import SendMessageView
from core.api.viewsets.cancel_send import CancelSendView
from core.api.viewsets.task import TaskDetailView
from core.api.viewsets.thread import ThreadViewSet
from core.api.viewsets.thread_access import ThreadAccessViewSet
Expand Down Expand Up @@ -174,6 +175,11 @@
SendMessageView.as_view(),
name="send-message",
),
path(
f"api/{settings.API_VERSION}/send/cancel/",
CancelSendView.as_view(),
name="cancel-send",
),
path(
f"api/{settings.API_VERSION}/tasks/<str:task_id>/",
TaskDetailView.as_view(),
Expand Down
Loading
Loading