Skip to content

Commit 48c5924

Browse files
committed
refactor(workers): Remove circuit breaker implementation
The circuit breaker pattern was causing more problems than it solved. With a low number of workers but high number of email accounts, a single faulty account generating errors could block access to many healthy accounts managed by the same worker. Changes: - Remove circuit breaker constants and WeakMap from server.js - Remove circuit breaker helper functions (getCircuitBreaker, recordCircuitSuccess, recordCircuitFailure, isCircuitOpen) - Simplify call() function to remove circuit breaker logic - Remove circuit state reporting from getThreadsInfo() - Remove CircuitOpen error handling from metrics collector - Remove circuit breaker UI display from routes and templates - Clean up circuit breaker comments in OAuth modules The worker communication system now operates without artificial blocking. Workers process requests directly and return actual errors from faulty accounts without affecting other accounts. The existing timeout mechanism and heartbeat-based health monitoring remain in place.
1 parent c3692b9 commit 48c5924

File tree

6 files changed

+8
-194
lines changed

6 files changed

+8
-194
lines changed

lib/metrics-collector.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ class MetricsCollector {
150150
resourceUsageError: {
151151
error: err.message,
152152
code: err.code || 'TIMEOUT',
153-
unresponsive: err.code === 'Timeout' || err.code === 'CircuitOpen'
153+
unresponsive: err.code === 'Timeout'
154154
},
155155
...metadata
156156
});

lib/oauth/gmail.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ class GmailOauth {
309309

310310
if (!res.ok) {
311311
let err = new Error('Token request failed');
312-
err.statusCode = res.status; // Set statusCode for circuit breaker logic
312+
err.statusCode = res.status;
313313
err.tokenRequest = {
314314
url: requestUrl,
315315
method,
@@ -420,7 +420,7 @@ class GmailOauth {
420420

421421
if (!res.ok) {
422422
let err = new Error('Token request failed');
423-
err.statusCode = res.status; // Set statusCode for circuit breaker logic
423+
err.statusCode = res.status;
424424
err.tokenRequest = {
425425
url: requestUrl,
426426
method,
@@ -507,7 +507,7 @@ class GmailOauth {
507507

508508
if (!res.ok) {
509509
let err = new Error('OAuth2 request failed');
510-
err.statusCode = res.status; // Set statusCode for circuit breaker logic
510+
err.statusCode = res.status;
511511
err.oauthRequest = {
512512
o: 1,
513513
url,

lib/oauth/outlook.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ class OutlookOauth {
289289

290290
if (!res.ok) {
291291
let err = new Error('Token request failed');
292-
err.statusCode = res.status; // Set statusCode for circuit breaker logic
292+
err.statusCode = res.status;
293293
err.tokenRequest = {
294294
url: requestUrl,
295295
method,
@@ -382,7 +382,7 @@ class OutlookOauth {
382382

383383
if (!res.ok) {
384384
let err = new Error('Token request failed');
385-
err.statusCode = res.status; // Set statusCode for circuit breaker logic
385+
err.statusCode = res.status;
386386
err.tokenRequest = {
387387
url: requestUrl,
388388
method,
@@ -469,7 +469,7 @@ class OutlookOauth {
469469

470470
if (!res.ok) {
471471
let err = new Error('OAuth2 request failed');
472-
err.statusCode = res.status; // Set statusCode for circuit breaker logic
472+
err.statusCode = res.status;
473473
err.oauthRequest = {
474474
url,
475475
method,

lib/routes-ui.js

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9972,21 +9972,6 @@ ${now}`,
99729972
}
99739973
}
99749974

9975-
// Process circuit breaker status
9976-
if (threadInfo.circuitState && threadInfo.circuitState !== 'closed') {
9977-
if (threadInfo.circuitState === 'open') {
9978-
threadInfo.circuitBadge = 'Circuit Open';
9979-
threadInfo.circuitBadgeType = 'danger';
9980-
} else if (threadInfo.circuitState === 'half-open') {
9981-
threadInfo.circuitBadge = 'Circuit Testing';
9982-
threadInfo.circuitBadgeType = 'warning';
9983-
}
9984-
9985-
// Add failure count if present
9986-
if (threadInfo.circuitFailures > 0) {
9987-
threadInfo.circuitInfo = `${threadInfo.circuitFailures} failures`;
9988-
}
9989-
}
99909975

99919976
for (let key of Object.keys(threadInfo)) {
99929977
switch (key) {

server.js

Lines changed: 1 addition & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -587,12 +587,6 @@ let workerHealthStatus = new WeakMap(); // Map of worker -> health status
587587
const HEARTBEAT_TIMEOUT = 30 * 1000; // 30 seconds before marking unhealthy
588588
const HEARTBEAT_RESTART_TIMEOUT = 60 * 1000; // 60 seconds before auto-restart
589589

590-
// Circuit breaker for worker communication
591-
let workerCircuitBreakers = new WeakMap(); // Map of worker -> circuit breaker state
592-
const CIRCUIT_FAILURE_THRESHOLD = 3; // Open circuit after 3 failures
593-
const CIRCUIT_RESET_TIMEOUT = 30 * 1000; // Try to close circuit after 30s
594-
const CIRCUIT_HALF_OPEN_ATTEMPTS = 1; // Number of test requests in half-open state
595-
596590
// Suspended worker types (when no license is active)
597591
let suspendedWorkerTypes = new Set();
598592

@@ -754,11 +748,6 @@ async function getThreadsInfo() {
754748
threadData.timeSinceHeartbeat = Date.now() - lastHeartbeat;
755749
}
756750

757-
// Add circuit breaker status
758-
const circuit = getCircuitBreaker(worker);
759-
threadData.circuitState = circuit.state;
760-
threadData.circuitFailures = circuit.failures;
761-
762751
// Add worker metadata
763752
let workerMeta = workersMeta.has(worker) ? workersMeta.get(worker) : {};
764753
for (let key of Object.keys(workerMeta)) {
@@ -867,119 +856,6 @@ function startHealthMonitoring() {
867856
}, 5000);
868857
}
869858

870-
/**
871-
* Get or initialize circuit breaker state for a worker
872-
* @param {Worker} worker - The worker thread
873-
* @returns {Object} Circuit breaker state
874-
*/
875-
function getCircuitBreaker(worker) {
876-
if (!workerCircuitBreakers.has(worker)) {
877-
workerCircuitBreakers.set(worker, {
878-
state: 'closed', // closed, open, half-open
879-
failures: 0,
880-
lastFailureTime: null,
881-
lastAttemptTime: null,
882-
halfOpenAttempts: 0
883-
});
884-
}
885-
return workerCircuitBreakers.get(worker);
886-
}
887-
888-
/**
889-
* Record a successful call to a worker
890-
* @param {Worker} worker - The worker thread
891-
*/
892-
function recordCircuitSuccess(worker) {
893-
const circuit = getCircuitBreaker(worker);
894-
895-
if (circuit.state === 'half-open') {
896-
// Successful call in half-open state, close the circuit
897-
logger.info({
898-
msg: 'Circuit breaker closed after successful test',
899-
threadId: worker.threadId,
900-
type: workersMeta.get(worker)?.type
901-
});
902-
}
903-
904-
// Reset circuit to closed state
905-
circuit.state = 'closed';
906-
circuit.failures = 0;
907-
circuit.lastFailureTime = null;
908-
circuit.halfOpenAttempts = 0;
909-
}
910-
911-
/**
912-
* Record a failed call to a worker
913-
* @param {Worker} worker - The worker thread
914-
*/
915-
function recordCircuitFailure(worker) {
916-
const circuit = getCircuitBreaker(worker);
917-
const now = Date.now();
918-
919-
circuit.failures++;
920-
circuit.lastFailureTime = now;
921-
922-
if (circuit.state === 'half-open') {
923-
// Failed in half-open state, reopen the circuit
924-
circuit.state = 'open';
925-
circuit.halfOpenAttempts = 0;
926-
logger.warn({
927-
msg: 'Circuit breaker reopened after failed test',
928-
threadId: worker.threadId,
929-
type: workersMeta.get(worker)?.type
930-
});
931-
} else if (circuit.failures >= CIRCUIT_FAILURE_THRESHOLD && circuit.state === 'closed') {
932-
// Threshold reached, open the circuit
933-
circuit.state = 'open';
934-
logger.warn({
935-
msg: 'Circuit breaker opened due to failures',
936-
threadId: worker.threadId,
937-
type: workersMeta.get(worker)?.type,
938-
failures: circuit.failures
939-
});
940-
}
941-
}
942-
943-
/**
944-
* Check if circuit breaker allows a call to the worker
945-
* @param {Worker} worker - The worker thread
946-
* @returns {boolean} Whether the call is allowed
947-
*/
948-
function isCircuitOpen(worker) {
949-
const circuit = getCircuitBreaker(worker);
950-
const now = Date.now();
951-
952-
if (circuit.state === 'closed') {
953-
return false; // Circuit is closed, allow calls
954-
}
955-
956-
if (circuit.state === 'open') {
957-
// Check if enough time has passed to try half-open
958-
if (now - circuit.lastFailureTime > CIRCUIT_RESET_TIMEOUT) {
959-
circuit.state = 'half-open';
960-
circuit.halfOpenAttempts = 0;
961-
logger.info({
962-
msg: 'Circuit breaker entering half-open state',
963-
threadId: worker.threadId,
964-
type: workersMeta.get(worker)?.type
965-
});
966-
return false; // Allow one test call
967-
}
968-
return true; // Circuit is open, block calls
969-
}
970-
971-
if (circuit.state === 'half-open') {
972-
// Allow limited attempts in half-open state
973-
if (circuit.halfOpenAttempts < CIRCUIT_HALF_OPEN_ATTEMPTS) {
974-
circuit.halfOpenAttempts++;
975-
return false; // Allow test call
976-
}
977-
return true; // Block additional calls until test succeeds
978-
}
979-
980-
return false;
981-
}
982-
983859
/**
984860
* Send a webhook notification
985861
* @param {string} account - Account ID
@@ -1530,27 +1406,6 @@ let spawnWorker = async type => {
15301406
* @throws {Error} Timeout or communication error
15311407
*/
15321408
async function call(worker, message, transferList) {
1533-
// Check circuit breaker first
1534-
if (isCircuitOpen(worker)) {
1535-
const err = new Error('Circuit breaker is open - worker is unresponsive');
1536-
err.statusCode = 503;
1537-
err.code = 'CircuitOpen';
1538-
err.threadId = worker.threadId;
1539-
1540-
// For resource-usage calls, return error info instead of throwing
1541-
if (message.cmd === 'resource-usage') {
1542-
return {
1543-
resourceUsageError: {
1544-
error: err.message,
1545-
code: err.code,
1546-
circuitOpen: true
1547-
}
1548-
};
1549-
}
1550-
1551-
throw err;
1552-
}
1553-
15541409
return new Promise((resolve, reject) => {
15551410
// Generate unique message ID
15561411
let mid = `${Date.now()}:${++mids}`;
@@ -1566,33 +1421,19 @@ async function call(worker, message, transferList) {
15661421
err.ttl = ttl;
15671422
err.command = message;
15681423

1569-
// Record circuit breaker failure
1570-
recordCircuitFailure(worker);
15711424
callQueue.delete(mid);
15721425

15731426
reject(err);
15741427
}, ttl);
15751428

1576-
// Store callback info with circuit breaker tracking
1429+
// Store callback info
15771430
callQueue.set(mid, {
15781431
resolve: result => {
15791432
clearTimeout(timer);
1580-
recordCircuitSuccess(worker);
15811433
resolve(result);
15821434
},
15831435
reject: err => {
15841436
clearTimeout(timer);
1585-
1586-
// Only record circuit breaker failures for actual worker/infrastructure issues
1587-
// Do NOT count application-level errors that indicate the worker is functioning correctly
1588-
// Application errors have statusCode in the 4xx range (client errors)
1589-
// Infrastructure errors have statusCode in the 5xx range (server errors) or are timeouts
1590-
const isInfrastructureFailure = !err.statusCode || err.statusCode >= 500 || err.code === 'Timeout';
1591-
1592-
if (isInfrastructureFailure) {
1593-
recordCircuitFailure(worker);
1594-
}
1595-
15961437
reject(err);
15971438
},
15981439
timer
@@ -3150,11 +2991,6 @@ const startApplication = async () => {
31502991
metadata.timeSinceHeartbeat = Date.now() - lastHeartbeat;
31512992
}
31522993

3153-
// Add circuit breaker status
3154-
const circuit = getCircuitBreaker(worker);
3155-
metadata.circuitState = circuit.state;
3156-
metadata.circuitFailures = circuit.failures;
3157-
31582994
// Add worker metadata
31592995
let workerMeta = workersMeta.has(worker) ? workersMeta.get(worker) : {};
31602996
for (let key of Object.keys(workerMeta)) {

views/internals/index.hbs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,6 @@
4444
{{else if healthBadge}}
4545
<span class="badge badge-{{healthBadgeType}}">{{healthBadge}}</span>
4646
{{/if}}
47-
48-
{{#if circuitBadge}}
49-
<span class="badge badge-{{circuitBadgeType}}">{{circuitBadge}}</span>
50-
{{#if circuitInfo}}
51-
<small class="text-muted">({{circuitInfo}})</small>
52-
{{/if}}
53-
{{/if}}
5447

5548
</td>
5649
<td class="text-right">{{formatInteger threadId userLocale}}</td>

0 commit comments

Comments
 (0)