Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .cursor/rules/storage-provider.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ Follow all steps below to implement a new storage. More details follow after the
- [ ] Make poetry lock: `poetry install && poetry lock`
12. [ ] Create database migrations using `poetry run python manage.py makemigrations` only!
13. [ ] Ensure that you correctly handle token and security fields; they should not be displayed on the frontend or backend after they are initially entered and saved. Verify how this works with other storage codes.
14. [ ] **Error Handling Strategy:**
- **Import**: If `VolumesAPI.iter_files` or other listing helpers call `resp.raise_for_status()`, make sure callers wrap these in try/except (or downgrade to debug logging) so recursive listing failures can be surfaced without crashing the sync job.
- **Export**: When saving/deleting annotations, *never* let `resp.raise_for_status()` propagate to the RQ worker. Catch `RequestException` (and generic exceptions), log a warning, and record the error in `Storage.meta` and `Storage.traceback` so the user can see why some files were skipped. Jobs must finish with the correct storage status (`completed_with_errors`) even when the provider intermittently fails.
- Update this guide whenever new patterns emerge so providers keep consistent behavior.

### 3. Frontend Implementation
1. [ ] Check examples: for Open Source see: `label-studio/web/apps/labelstudio/src/pages/Settings/StorageSettings/providers/`, for Enterprise see: `label-studio-enterprise/web/apps/labelstudio/src/pages/Settings/StorageSettings/providers/`
Expand Down
9 changes: 7 additions & 2 deletions label_studio/core/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,13 @@ def is_job_on_worker(job_id, queue_name):
:return: True if job on worker
"""
registry = StartedJobRegistry(queue_name, connection=_redis)
ids = registry.get_job_ids()
return job_id in ids
member = job_id.encode() if isinstance(job_id, str) else job_id
# Use Redis ZSET membership check (ZSCORE) instead of registry.get_job_ids(),
# because the latter calls registry.cleanup(), which installs SIGALRM timers and
# crashes when executed outside the interpreter's main thread (e.g., inside WSGI).
# ZSCORE simply looks up the score of the member in the sorted set: if it returns
# None, the member/job ID is not present; otherwise it is currently marked as running.
return registry.connection.zscore(registry.key, member) is not None


def delete_job_by_id(queue, id):
Expand Down
68 changes: 68 additions & 0 deletions label_studio/core/tests/test_redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import threading
from unittest.mock import MagicMock, patch

import pytest

from label_studio.core.redis import is_job_on_worker


def test_is_job_on_worker_does_not_call_get_job_ids():
"""Ensure membership check avoids StartedJobRegistry.get_job_ids, preventing signal usage in threads."""
fake_connection = MagicMock()
fake_connection.zscore.return_value = None

with patch('label_studio.core.redis.StartedJobRegistry') as registry_cls:
registry = registry_cls.return_value
registry.connection = fake_connection
registry.key = 'rq:started:low'
registry.get_job_ids.side_effect = ValueError('should not be called')

assert is_job_on_worker('job123', 'low') is False

fake_connection.zscore.assert_called_once_with('rq:started:low', b'job123')


def test_is_job_on_worker_safe_from_non_main_thread(monkeypatch):
"""Simulate the original failure: registry.get_job_ids would raise when used from non-main threads."""
import signal

original_signal = signal.signal

def fake_signal(sig, handler):
if threading.current_thread() is not threading.main_thread():
raise ValueError('signal only works in main thread of the main interpreter')
return original_signal(sig, handler)

monkeypatch.setattr(signal, 'signal', fake_signal)

fake_connection = MagicMock()
fake_connection.zscore.return_value = None

class DummyRegistry:
def __init__(self, queue_name, connection):
self.connection = fake_connection
self.key = f'rq:started:{queue_name}'

def get_job_ids(self):
# The old implementation would call this, which uses signal and fails in threads
signal.signal(signal.SIGALRM, lambda *args: None)
return []

with patch('label_studio.core.redis.StartedJobRegistry', DummyRegistry):
result: dict[str, object] = {}

def runner():
try:
result['value'] = is_job_on_worker('job123', 'low')
except Exception as exc: # pragma: no cover - used for regression verification
result['error'] = exc

t = threading.Thread(target=runner)
t.start()
t.join()

if 'error' in result:
raise result['error'] # type: ignore[misc]

assert result['value'] is False

10 changes: 6 additions & 4 deletions label_studio/io_storages/base_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,10 +667,12 @@ def scan_and_create_links(self):

def sync(self):
if redis_connected():
queue = django_rq.get_queue('low')
queue_name = 'low'
queue = django_rq.get_queue(queue_name)
meta = {'project': self.project.id, 'storage': self.id}
if not is_job_in_queue(queue, 'import_sync_background', meta=meta) and not is_job_on_worker(
job_id=self.last_sync_job, queue_name='low'
if (
not is_job_in_queue(queue, 'import_sync_background', meta=meta) and
not is_job_on_worker(job_id=self.last_sync_job, queue_name=queue_name)
):
if not self.info_set_queued():
return
Expand All @@ -680,7 +682,7 @@ def sync(self):
import_sync_background,
self.__class__,
self.id,
queue_name='low',
queue_name=queue_name,
meta=meta,
project_id=self.project.id,
organization_id=self.project.organization.id,
Expand Down
Loading