Skip to content

Commit 80efeee

Browse files
committed
Never rescue jobs that return a timeout of -1
Fix a bug in the job rescuer in that jobs with a timeout of -1 (meaning no timeout) should never be rescued, but they were being rescued. Fixes #1287.
1 parent d3b0ef7 commit 80efeee

3 files changed

Lines changed: 12 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql
4545
### Fixed
4646

4747
- Fix `JobCancel` having no effect on running jobs when using a poll-only driver (e.g. `riverdatabasesql`). The `controlActionCancel` event was silently dropped in `fetchAndRunLoop`'s `queueControlCh` handler instead of being forwarded to `maybeCancelJob`. Note: this fix only works within a single process; cross-process cancels in poll-only setups must wait for the next poll cycle. [PR #1245](https://github.com/riverqueue/river/pull/1245).
48+
- Ensure jobs that return a custom timeout of -1 (no timeout) are never rescued. [PR #1288](https://github.com/riverqueue/river/pull/1288).
4849

4950
## [0.39.0] - 2026-06-03
5051

internal/maintenance/job_rescuer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,8 @@ func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRo
312312
slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID))
313313
}
314314

315-
if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() {
315+
timeout := workUnit.Timeout()
316+
if timeout < 0 || timeout > 0 && now.Sub(*job.AttemptedAt) < timeout {
316317
return jobRetryDecisionIgnore, time.Time{}
317318
}
318319

internal/maintenance/job_rescuer_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func TestJobRescuer(t *testing.T) {
6565
const (
6666
rescuerJobKind = "rescuer"
6767
rescuerJobKindLongTimeout = "rescuer_long_timeout"
68+
rescuerJobKindNoTimeout = "rescuer_no_timeout"
6869
)
6970

7071
type testBundle struct {
@@ -95,6 +96,8 @@ func TestJobRescuer(t *testing.T) {
9596
return &callbackWorkUnitFactory{Callback: emptyCallback}
9697
case rescuerJobKindLongTimeout:
9798
return &callbackWorkUnitFactory{Callback: emptyCallback, timeout: JobRescuerRescueAfterDefault + 5*time.Minute}
99+
case rescuerJobKindNoTimeout:
100+
return &callbackWorkUnitFactory{Callback: emptyCallback, timeout: -1}
98101
}
99102
panic("unhandled kind: " + kind)
100103
},
@@ -163,6 +166,8 @@ func TestJobRescuer(t *testing.T) {
163166
longTimeOutJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})
164167
longTimeOutJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-6 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})
165168

169+
noTimeoutJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindNoTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-24 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})
170+
166171
require.NoError(t, rescuer.Start(ctx))
167172

168173
rescuer.TestSignals.FetchedBatch.WaitOrTimeout()
@@ -226,6 +231,10 @@ func TestJobRescuer(t *testing.T) {
226231
notTimedOutJob2After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: longTimeOutJob2.ID, Schema: rescuer.Config.Schema})
227232
require.NoError(t, err)
228233
require.Equal(t, rivertype.JobStateRetryable, notTimedOutJob2After.State)
234+
235+
noTimeoutJobAfter, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: noTimeoutJob.ID, Schema: rescuer.Config.Schema})
236+
require.NoError(t, err)
237+
require.Equal(t, rivertype.JobStateRunning, noTimeoutJobAfter.State)
229238
})
230239

231240
t.Run("RescuesInBatches", func(t *testing.T) {

0 commit comments

Comments
 (0)