Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions worker/config/custom-environment-variables.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export default {
}
},
maxFailures: 'MAX_FAILURES',
maxFailuresCooldown: 'MAX_FAILURES_COOLDOWN',
mongoUrl: 'MONGO_URL',
secretKeys: {
events: 'SECRET_EVENTS'
Expand Down
1 change: 1 addition & 0 deletions worker/config/default.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export default {
}
},
maxFailures: 10,
maxFailuresCooldown: 12, // in hours
mongoUrl: 'mongodb://localhost:27017/data-fair-processings',
privateEventsUrl: null,
secretKeys: {
Expand Down
1 change: 1 addition & 0 deletions worker/config/development.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export default {
dataFairAdminMode: true,
dataFairAPIKey: '', // override in local-development.cjs
maxFailures: 2,
maxFailuresCooldown: 0.05, // 3 minutes
mongoUrl: 'mongodb://localhost:27017/data-fair-processings-development',
secretKeys: {
events: 'secret-events'
Expand Down
5 changes: 5 additions & 0 deletions worker/config/type/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"defaultLimits",
"mails",
"maxFailures",
"maxFailuresCooldown",
"mongoUrl",
"locks",
"observer",
Expand Down Expand Up @@ -70,6 +71,10 @@
"type": "number",
"description": "Number of failures before a processing becomes inactive"
},
"maxFailuresCooldown": {
"type": "number",
"description": "Minimum time in hours between first failure and last failure to disable a processing. Set to 0 to disable cooldown."
},
"mongoUrl": {
"type": "string"
},
Expand Down
49 changes: 35 additions & 14 deletions worker/src/utils/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,23 +102,44 @@ export const finish = async (run: Run, errorMessage: string | undefined = undefi
} else if (lastRun.status === 'error') {
sendProcessingEvent(run, 'a échoué', 'finish-error', errorMessage)

const reachedMaxFailures = (await mongo.runs.aggregate([
{ $match: { 'processing._id': run.processing._id } }, // filter by processing
{ $sort: { finishedAt: -1 } }, // sort by finishedAt descending (most recent first)
{ $limit: config.maxFailures }, // take the last X runs
const raw = (await mongo.runs.aggregate([
{ $match: { 'processing._id': run.processing._id } },
{ $sort: { finishedAt: -1 } },
{
$group: { // aggregate
_id: null,
total: { $sum: 1 }, // count total runs in this slice
errors: { $sum: { $cond: [{ $eq: ['$status', 'error'] }, 1, 0] } } // count runs with status 'error'
}
},
{
$project: {
allErrors: { $eq: [config.maxFailures, '$errors'] } // true if all X runs are errors
$facet: {
lastRuns: [ // for maxFailures
{ $limit: config.maxFailures },
{
$group: {
_id: null,
errors: { $sum: { $cond: [{ $eq: ['$status', 'error'] }, 1, 0] } }
}
}
],
allErrors: [ // for cooldown
{ $match: { status: 'error' } },
{
$group: {
_id: null,
firstError: { $min: '$finishedAt' },
lastError: { $max: '$finishedAt' }
}
}
]
}
}
]).toArray())[0]?.allErrors ?? false
]).toArray())[0]

const errors = raw?.lastRuns?.[0]?.errors ?? 0 // number of errors among the last maxFailures runs
const firstError = raw?.allErrors?.[0]?.firstError ? new Date(raw.allErrors[0].firstError) : null // date of the first error (across all runs)
const lastError = raw?.allErrors?.[0]?.lastError ? new Date(raw.allErrors[0].lastError) : null // date of the last error (across all runs)

const allErrors = errors === config.maxFailures
const cooldownReached = firstError && lastError
? (lastError.getTime() - firstError.getTime()) / (1000 * 60 * 60) >= config.maxFailuresCooldown // (1000 * 60 * 60) convert to hours
: false

const reachedMaxFailures = allErrors && cooldownReached

// Disable processing if reached max failures
if (reachedMaxFailures) {
Expand Down