From 073954d62e0ecd5abc29ac0cb4c69034d0e91eb0 Mon Sep 17 00:00:00 2001 From: John McLear Date: Sat, 16 May 2026 07:24:05 +0100 Subject: [PATCH 1/2] feat(scaling): engine.io socket flush deferral (#7756 / #7767) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the closed engine.io WS packing prototype (#7772). That patch only modified transport.send(packets[]) and never fired because engine.io's Socket.sendPacket calls flush() synchronously after each push to writeBuffer — and flush drains immediately when transport.writable is true (microseconds on WebSocket). The writeBuffer almost never contained more than one packet. This patch flips that. Socket.prototype.sendPacket is re-implemented to push to writeBuffer and then schedule a single coalesced flush via queueMicrotask. Multiple sendPacket calls in the same task all accumulate; the queued microtask drains the whole batch. The transport.send([packets]) call then sees N > 1 packets in steady state, which is where lever 8 / future engine.io transport packing work has the opportunity to coalesce to one WS frame. Microtask deferral adds zero meaningful wall-clock latency: microtasks drain before the next macrotask, so anything waiting on the next I/O callback / timer still sees the flush completed first. Wire bytes are unchanged. Gated by settings.engineFlushDefer. Default false. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/node/hooks/express/socketio.ts | 8 +++ src/node/utils/EngineFlushDeferral.ts | 95 +++++++++++++++++++++++++++ src/node/utils/Settings.ts | 14 ++++ 3 files changed, 117 insertions(+) create mode 100644 src/node/utils/EngineFlushDeferral.ts diff --git a/src/node/hooks/express/socketio.ts b/src/node/hooks/express/socketio.ts index 79ef892760b..4734ee98dc3 100644 --- a/src/node/hooks/express/socketio.ts +++ b/src/node/hooks/express/socketio.ts @@ -69,6 +69,14 @@ const socketSessionMiddleware = (args: any) => (socket: any, next: Function) => }; export const expressCreateServer = (hookName:string, args:ArgsExpressType, cb:Function) => { + // Engine.io socket flush deferral (#7756 / #7767). Apply BEFORE building + // the socket.io Server so the patched Socket prototype is in effect when + // the Server creates its engine. + if (settings.engineFlushDefer === true) { + // eslint-disable-next-line @typescript-eslint/no-require-imports + require('../../utils/EngineFlushDeferral').installEngineFlushDeferral(); + } + // init socket.io and redirect all requests to the MessageHandler // there shouldn't be a browser that isn't compatible to all // transports in this list at once diff --git a/src/node/utils/EngineFlushDeferral.ts b/src/node/utils/EngineFlushDeferral.ts new file mode 100644 index 00000000000..e8c15b4ea6f --- /dev/null +++ b/src/node/utils/EngineFlushDeferral.ts @@ -0,0 +1,95 @@ +// Engine.io socket flush deferral — #7756 / #7767 deeper investigation +// after the simple WS transport-level packing prototype (#7772) showed +// that the writeBuffer almost never accumulates because flush() drains +// immediately on `transport.writable === true`. +// +// engine.io's Socket.sendPacket(...) ends with: +// +// this.writeBuffer.push(packet); +// if (callback) this.packetsFn.push(callback); +// this.flush(); // <-- synchronous +// +// flush() reads writeBuffer and hands it to transport.send. For +// WebSocket, transport.writable is true again within microseconds of +// each write, so each sendPacket() call drains a buffer of size 1. The +// transport.send([packets]) function then iterates packets and writes +// one WS frame per packet — which is what the polling transport's +// natural encodePayload batching avoids. +// +// This patch coalesces synchronous-task sendPacket calls onto a single +// microtask-scheduled flush. Inside the same JS task, multiple +// sendPacket() calls accumulate in writeBuffer; the queued microtask +// then calls flush() once with the whole batch. The transport's +// send([batch]) sees N > 1 packets and the WS payload-encoding fast +// path (also added by lever 8) coalesces them into one frame. +// +// Microtask deferral adds zero meaningful wall-clock latency: +// microtasks drain before the next macrotask, so any consumer waiting +// on the next setImmediate / setTimeout / I/O callback still sees the +// flush completed. +// +// Forward-compatible. Existing clients receive identical wire bytes +// because the engine.io packet encoding is unchanged; the difference +// is only how many engine.io packets share one transport-level send +// call. The WS transport's send([packets]) path is then where lever 8 +// (or this patch's accompanying engine-packing branch) decides +// whether to ship them as N frames or one payload-encoded frame. +// +// Gated by settings.engineFlushDefer. Default off; production unaffected. + +import log4js from 'log4js'; + +const logger = log4js.getLogger('engine-flush-defer'); + +let installed = false; + +const SCHEDULED = Symbol('engineFlushScheduled'); + +export const installEngineFlushDeferral = (): void => { + if (installed) return; + installed = true; + + let SocketProto: {sendPacket: (...a: unknown[]) => unknown}; + try { + // eslint-disable-next-line @typescript-eslint/no-require-imports + SocketProto = require('engine.io/build/socket').Socket.prototype; + } catch (err: any) { + logger.warn(`Unable to install engine.io flush deferral (module not found): ${err && err.message || err}`); + return; + } + if (typeof SocketProto.sendPacket !== 'function') { + logger.warn('engine.io Socket shape unexpected; skipping flush deferral patch'); + return; + } + + // Re-implementing sendPacket inline rather than wrapping the original + // so the single closing `this.flush()` becomes a microtask-coalesced + // schedule. The body is intentionally a near-verbatim copy of the + // engine.io 6.6.5 implementation so future engine.io upgrades that + // change packet-shape semantics still need re-vetting. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + SocketProto.sendPacket = function (this: any, type: any, data: any, options: any, callback: any) { + if ('function' === typeof options) { + callback = options; + options = {}; + } + if ('closing' === this.readyState || 'closed' === this.readyState) return; + + options = options || {}; + options.compress = options.compress !== false; + const packet: any = {type, options}; + if (data !== undefined) packet.data = data; + this.emit('packetCreate', packet); + this.writeBuffer.push(packet); + if ('function' === typeof callback) this.packetsFn.push(callback); + + if (this[SCHEDULED]) return; + this[SCHEDULED] = true; + queueMicrotask(() => { + this[SCHEDULED] = false; + this.flush(); + }); + }; + + logger.info('engine.io socket flush deferral enabled (#7756 / #7767)'); +}; diff --git a/src/node/utils/Settings.ts b/src/node/utils/Settings.ts index 97413004100..76a687746bd 100644 --- a/src/node/utils/Settings.ts +++ b/src/node/utils/Settings.ts @@ -272,6 +272,7 @@ export type SettingsType = { automaticReconnectionTimeout: number, loadTest: boolean, scalingDiveMetrics: boolean, + engineFlushDefer: boolean, dumpOnUncleanExit: boolean, indentationOnNewLine: boolean, logconfig: any | null, @@ -658,6 +659,19 @@ const settings: SettingsType = { * production deployments aren't paying for instrumentation they don't use. */ scalingDiveMetrics: false, + /** + * Defer engine.io socket flush onto the next microtask so multiple + * sendPacket() calls within the same task accumulate in the writeBuffer + * before drain. Pairs with engine.io's existing transport.send([packets]) + * fast path so a batched send produces fewer WebSocket frames. + * + * Adds no meaningful wall-clock latency — microtasks drain before any + * subsequent macrotask. Backward-compatible at the wire level; existing + * clients receive identical packet bytes. + * + * Default false. Enable only when scoring under the scaling dive. + */ + engineFlushDefer: false, /** * Disable dump of objects preventing a clean exit */ From f0661a9d8e9f6832bf0c9c1bf33be84990e3c607 Mon Sep 17 00:00:00 2001 From: John McLear Date: Sat, 16 May 2026 07:42:34 +0100 Subject: [PATCH 2/2] fix(engine-flush-defer): address Qodo review Two issues from the initial review: 1. Install guard blocked retries. Setting `installed = true` BEFORE requiring + validating engine.io meant a transient require error permanently disabled the patch for the rest of the process. Now the flag is set only after both checks pass; on failure, the warning is logged and a later boot path can retry. 2. engineFlushDefer was undocumented in settings.json.template. Added with the same prose as Settings.ts, including the "wire bytes are unchanged" / "no meaningful wall-clock latency" notes so operators see the safety case. Co-Authored-By: Claude Opus 4.7 (1M context) --- settings.json.template | 13 +++++++++++++ src/node/utils/EngineFlushDeferral.ts | 10 +++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/settings.json.template b/settings.json.template index e88e82a36af..c37bfe7f928 100644 --- a/settings.json.template +++ b/settings.json.template @@ -733,6 +733,19 @@ */ "loadTest": false, + /* + * Defer engine.io socket flush onto the next microtask so multiple + * sendPacket() calls in the same task accumulate in writeBuffer before + * the underlying transport.send drains. Pairs with engine.io's existing + * batched-send path so high-fan-out scenarios produce fewer WebSocket + * frames. Microtask deferral adds no meaningful wall-clock latency — + * microtasks drain before any subsequent macrotask. Wire bytes are + * unchanged. + * + * #7756 / #7767. Default off; production unaffected. + */ + "engineFlushDefer": false, + /** * Disable dump of objects preventing a clean exit */ diff --git a/src/node/utils/EngineFlushDeferral.ts b/src/node/utils/EngineFlushDeferral.ts index e8c15b4ea6f..36b3804ed2b 100644 --- a/src/node/utils/EngineFlushDeferral.ts +++ b/src/node/utils/EngineFlushDeferral.ts @@ -47,7 +47,6 @@ const SCHEDULED = Symbol('engineFlushScheduled'); export const installEngineFlushDeferral = (): void => { if (installed) return; - installed = true; let SocketProto: {sendPacket: (...a: unknown[]) => unknown}; try { @@ -55,12 +54,17 @@ export const installEngineFlushDeferral = (): void => { SocketProto = require('engine.io/build/socket').Socket.prototype; } catch (err: any) { logger.warn(`Unable to install engine.io flush deferral (module not found): ${err && err.message || err}`); - return; + return; // Leave `installed` false so a later boot path can retry. } if (typeof SocketProto.sendPacket !== 'function') { logger.warn('engine.io Socket shape unexpected; skipping flush deferral patch'); - return; + return; // Leave `installed` false so a later boot path can retry. } + // Only after both require and shape check succeed do we record that the + // patch is installed. Setting the flag before validation (the original + // code) would have permanently disabled retries after a transient + // require failure in test/CI environments where socket.io may load late. + installed = true; // Re-implementing sendPacket inline rather than wrapping the original // so the single closing `this.flush()` becomes a microtask-coalesced