diff --git a/worker/config/custom-environment-variables.mjs b/worker/config/custom-environment-variables.mjs index a87e1670..563424c1 100644 --- a/worker/config/custom-environment-variables.mjs +++ b/worker/config/custom-environment-variables.mjs @@ -16,6 +16,7 @@ export default { } }, maxFailures: 'MAX_FAILURES', + maxFailuresCooldown: 'MAX_FAILURES_COOLDOWN', mongoUrl: 'MONGO_URL', secretKeys: { events: 'SECRET_EVENTS' diff --git a/worker/config/default.mjs b/worker/config/default.mjs index 00b88630..9168cf9b 100644 --- a/worker/config/default.mjs +++ b/worker/config/default.mjs @@ -22,6 +22,7 @@ export default { } }, maxFailures: 10, + maxFailuresCooldown: 12, // in hours mongoUrl: 'mongodb://localhost:27017/data-fair-processings', privateEventsUrl: null, secretKeys: { diff --git a/worker/config/development.mjs b/worker/config/development.mjs index c6dbb051..dca073e6 100644 --- a/worker/config/development.mjs +++ b/worker/config/development.mjs @@ -4,6 +4,7 @@ export default { dataFairAdminMode: true, dataFairAPIKey: '', // override in local-development.cjs maxFailures: 2, + maxFailuresCooldown: 0.05, // 3 minutes mongoUrl: 'mongodb://localhost:27017/data-fair-processings-development', secretKeys: { events: 'secret-events' diff --git a/worker/config/type/schema.json b/worker/config/type/schema.json index 21d1b79a..a5f559e9 100644 --- a/worker/config/type/schema.json +++ b/worker/config/type/schema.json @@ -19,6 +19,7 @@ "defaultLimits", "mails", "maxFailures", + "maxFailuresCooldown", "mongoUrl", "locks", "observer", @@ -70,6 +71,10 @@ "type": "number", "description": "Number of failures before a processing becomes inactive" }, + "maxFailuresCooldown": { + "type": "number", + "description": "Minimum time in hours between first failure and last failure to disable a processing. Set to 0 to disable cooldown." + }, "mongoUrl": { "type": "string" }, diff --git a/worker/src/utils/runs.ts b/worker/src/utils/runs.ts index 6dafb53a..d04e52f6 100644 --- a/worker/src/utils/runs.ts +++ b/worker/src/utils/runs.ts @@ -102,23 +102,44 @@ export const finish = async (run: Run, errorMessage: string | undefined = undefi } else if (lastRun.status === 'error') { sendProcessingEvent(run, 'a échoué', 'finish-error', errorMessage) - const reachedMaxFailures = (await mongo.runs.aggregate([ - { $match: { 'processing._id': run.processing._id } }, // filter by processing - { $sort: { finishedAt: -1 } }, // sort by finishedAt descending (most recent first) - { $limit: config.maxFailures }, // take the last X runs + const raw = (await mongo.runs.aggregate([ + { $match: { 'processing._id': run.processing._id } }, + { $sort: { finishedAt: -1 } }, { - $group: { // aggregate - _id: null, - total: { $sum: 1 }, // count total runs in this slice - errors: { $sum: { $cond: [{ $eq: ['$status', 'error'] }, 1, 0] } } // count runs with status 'error' - } - }, - { - $project: { - allErrors: { $eq: [config.maxFailures, '$errors'] } // true if all X runs are errors + $facet: { + lastRuns: [ // for maxFailures + { $limit: config.maxFailures }, + { + $group: { + _id: null, + errors: { $sum: { $cond: [{ $eq: ['$status', 'error'] }, 1, 0] } } + } + } + ], + allErrors: [ // for cooldown + { $match: { status: 'error' } }, + { + $group: { + _id: null, + firstError: { $min: '$finishedAt' }, + lastError: { $max: '$finishedAt' } + } + } + ] } } - ]).toArray())[0]?.allErrors ?? false + ]).toArray())[0] + + const errors = raw?.lastRuns?.[0]?.errors ?? 0 // number of errors among the last maxFailures runs + const firstError = raw?.allErrors?.[0]?.firstError ? new Date(raw.allErrors[0].firstError) : null // date of the first error (across all runs) + const lastError = raw?.allErrors?.[0]?.lastError ? new Date(raw.allErrors[0].lastError) : null // date of the last error (across all runs) + + const allErrors = errors === config.maxFailures + const cooldownReached = firstError && lastError + ? (lastError.getTime() - firstError.getTime()) / (1000 * 60 * 60) >= config.maxFailuresCooldown // (1000 * 60 * 60) convert to hours + : false + + const reachedMaxFailures = allErrors && cooldownReached // Disable processing if reached max failures if (reachedMaxFailures) {