-
Notifications
You must be signed in to change notification settings - Fork 11
✨(worker) switch from Celery to Dramatiq #390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| web: bin/scalingo_run_web | ||
| worker: celery -A messages.celery_app worker --task-events --beat -l INFO -c $CELERY_CONCURRENCY -Q celery,default | ||
| worker: python manage.py worker | ||
| scheduler: python manage.py crontab | ||
| postdeploy: python manage.py migrate |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,9 @@ | ||
| """API ViewSet for Celery task status.""" | ||
| """API ViewSet for asynchronous task statuses.""" | ||
|
|
||
| import logging | ||
|
|
||
| from celery import states as celery_states | ||
| from celery.result import AsyncResult | ||
| import dramatiq | ||
| from dramatiq.results import ResultFailure, ResultMissing, ResultTimeout | ||
| from drf_spectacular.utils import ( | ||
| OpenApiExample, | ||
| extend_schema, | ||
|
|
@@ -14,10 +14,13 @@ | |
| from rest_framework.response import Response | ||
| from rest_framework.views import APIView | ||
|
|
||
| from messages.celery_app import app as celery_app | ||
| from core.utils import get_task_progress | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Map Dramatiq states to Celery-like states for frontend compatibility | ||
| DRAMATIQ_STATES = ["PENDING", "SUCCESS", "FAILURE", "RETRY", "REJECTED", "PROGRESS"] | ||
|
|
||
|
|
||
| @extend_schema( | ||
| tags=["tasks"], | ||
|
|
@@ -34,9 +37,7 @@ | |
| 200: inline_serializer( | ||
| name="TaskStatusResponse", | ||
| fields={ | ||
| "status": drf_serializers.ChoiceField( | ||
| choices=sorted({*celery_states.ALL_STATES, "PROGRESS"}) | ||
| ), | ||
| "status": drf_serializers.ChoiceField(choices=sorted(DRAMATIQ_STATES)), | ||
| "result": drf_serializers.JSONField(allow_null=True), | ||
| "error": drf_serializers.CharField(allow_null=True), | ||
| }, | ||
|
|
@@ -59,38 +60,65 @@ | |
| ], | ||
| ) | ||
| class TaskDetailView(APIView): | ||
| """View to retrieve the status of a Celery task.""" | ||
| """View to retrieve the status of a task.""" | ||
|
|
||
| permission_classes = [permissions.IsAuthenticated] | ||
|
|
||
| def get(self, request, task_id): | ||
| """Get the status of a Celery task.""" | ||
|
|
||
| task_result = AsyncResult(task_id, app=celery_app) | ||
| """Get the status of a task.""" | ||
| # Try to fetch a result from Dramatiq's result backend | ||
| broker = dramatiq.get_broker() | ||
| result_backend = broker.get_results_backend() | ||
|
|
||
| # By default unknown tasks will be in PENDING. There is no reliable | ||
| # way to check if a task exists or not with Celery. | ||
| # https://github.com/celery/celery/issues/3596#issuecomment-262102185 | ||
| if result_backend is not None: | ||
| try: | ||
| # Retrieve a Message for this task id from the backend, then get result | ||
| # See Dramatiq results API: message.get_result(...) | ||
| message = result_backend.get_message(task_id) | ||
| result = message.get_result(backend=result_backend, block=False) | ||
| return Response( | ||
|
Comment on lines
+70
to
+79
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainVerify results-backend access pattern. broker.get_results_backend() and backend.get_message(...) must match your configured middleware/version. If not available, fetch the Results middleware and read its backend instead. 🌐 Web query: 💡 Result:
Sources: [1] Dramatiq Broker API (get_results_backend). [2] Dramatiq RedisBackend / results backend (get_result, build_message_key, _get/_store). Fix incorrect RedisBackend API usage—get_message(task_id) doesn't exist. RedisBackend does not expose a get_message(message_id) API. The code at lines 70-79 calls Retrieve the Message from Dramatiq's actor system or message store, then pass it to the backend's get_result() method. 🤖 Prompt for AI Agents |
||
| { | ||
| "status": "SUCCESS", | ||
| "result": result, | ||
| "error": None, | ||
| } | ||
| ) | ||
| except ResultMissing: | ||
| # No result yet; fall through to progress/pending logic | ||
| pass | ||
| except ResultFailure as exc: | ||
| return Response( | ||
| { | ||
| "status": "FAILURE", | ||
| "result": None, | ||
| "error": str(exc), | ||
| } | ||
| ) | ||
| except ResultTimeout as exc: | ||
| # Treat timeouts as pending | ||
| logger.debug("Result timeout for task %s: %s", task_id, exc) | ||
|
|
||
| # Prepare the response data | ||
| result_data = { | ||
| "status": task_result.status, | ||
| "result": None, | ||
| "error": None, | ||
| } | ||
| # Check if we have progress data for this task | ||
| progress_data = get_task_progress(task_id) | ||
|
|
||
| # If the result is a dict with status/result/error, unpack and propagate status | ||
| if isinstance(task_result.result, dict) and set(task_result.result.keys()) >= { | ||
| "status", | ||
| "result", | ||
| "error", | ||
| }: | ||
| result_data["status"] = task_result.result["status"] | ||
| result_data["result"] = task_result.result["result"] | ||
| result_data["error"] = task_result.result["error"] | ||
| else: | ||
| result_data["result"] = task_result.result | ||
| if task_result.state == "PROGRESS" and task_result.info: | ||
| result_data.update(task_result.info) | ||
| if progress_data: | ||
| # Task is in progress | ||
| return Response( | ||
| { | ||
| "status": "PROGRESS", | ||
| "result": None, | ||
| "error": None, | ||
| "progress": progress_data.get("progress"), | ||
| "message": progress_data.get("metadata", {}).get("message"), | ||
| "timestamp": progress_data.get("timestamp"), | ||
| } | ||
| ) | ||
|
|
||
| return Response(result_data) | ||
| # Default to pending when no result and no progress | ||
| return Response( | ||
| { | ||
| "status": "PENDING", | ||
| "result": None, | ||
| "error": None, | ||
| } | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WSGI bridge ignores status/headers and sets an invalid wsgi.input; prefix likely wrong.
Capture status/headers from start_response, build environ from request.META with a real byte stream, and set DashboardApp prefix to the mounted path for correct links.