diff --git a/settings.json.template b/settings.json.template index e88e82a36af..c37bfe7f928 100644 --- a/settings.json.template +++ b/settings.json.template @@ -733,6 +733,19 @@ */ "loadTest": false, + /* + * Defer engine.io socket flush onto the next microtask so multiple + * sendPacket() calls in the same task accumulate in writeBuffer before + * the underlying transport.send drains. Pairs with engine.io's existing + * batched-send path so high-fan-out scenarios produce fewer WebSocket + * frames. Microtask deferral adds no meaningful wall-clock latency — + * microtasks drain before any subsequent macrotask. Wire bytes are + * unchanged. + * + * #7756 / #7767. Default off; production unaffected. + */ + "engineFlushDefer": false, + /** * Disable dump of objects preventing a clean exit */ diff --git a/src/node/hooks/express/socketio.ts b/src/node/hooks/express/socketio.ts index 79ef892760b..4734ee98dc3 100644 --- a/src/node/hooks/express/socketio.ts +++ b/src/node/hooks/express/socketio.ts @@ -69,6 +69,14 @@ const socketSessionMiddleware = (args: any) => (socket: any, next: Function) => }; export const expressCreateServer = (hookName:string, args:ArgsExpressType, cb:Function) => { + // Engine.io socket flush deferral (#7756 / #7767). Apply BEFORE building + // the socket.io Server so the patched Socket prototype is in effect when + // the Server creates its engine. + if (settings.engineFlushDefer === true) { + // eslint-disable-next-line @typescript-eslint/no-require-imports + require('../../utils/EngineFlushDeferral').installEngineFlushDeferral(); + } + // init socket.io and redirect all requests to the MessageHandler // there shouldn't be a browser that isn't compatible to all // transports in this list at once diff --git a/src/node/utils/EngineFlushDeferral.ts b/src/node/utils/EngineFlushDeferral.ts new file mode 100644 index 00000000000..36b3804ed2b --- /dev/null +++ b/src/node/utils/EngineFlushDeferral.ts @@ -0,0 +1,99 @@ +// Engine.io socket flush deferral — #7756 / #7767 deeper investigation +// after the simple WS transport-level packing prototype (#7772) showed +// that the writeBuffer almost never accumulates because flush() drains +// immediately on `transport.writable === true`. +// +// engine.io's Socket.sendPacket(...) ends with: +// +// this.writeBuffer.push(packet); +// if (callback) this.packetsFn.push(callback); +// this.flush(); // <-- synchronous +// +// flush() reads writeBuffer and hands it to transport.send. For +// WebSocket, transport.writable is true again within microseconds of +// each write, so each sendPacket() call drains a buffer of size 1. The +// transport.send([packets]) function then iterates packets and writes +// one WS frame per packet — which is what the polling transport's +// natural encodePayload batching avoids. +// +// This patch coalesces synchronous-task sendPacket calls onto a single +// microtask-scheduled flush. Inside the same JS task, multiple +// sendPacket() calls accumulate in writeBuffer; the queued microtask +// then calls flush() once with the whole batch. The transport's +// send([batch]) sees N > 1 packets and the WS payload-encoding fast +// path (also added by lever 8) coalesces them into one frame. +// +// Microtask deferral adds zero meaningful wall-clock latency: +// microtasks drain before the next macrotask, so any consumer waiting +// on the next setImmediate / setTimeout / I/O callback still sees the +// flush completed. +// +// Forward-compatible. Existing clients receive identical wire bytes +// because the engine.io packet encoding is unchanged; the difference +// is only how many engine.io packets share one transport-level send +// call. The WS transport's send([packets]) path is then where lever 8 +// (or this patch's accompanying engine-packing branch) decides +// whether to ship them as N frames or one payload-encoded frame. +// +// Gated by settings.engineFlushDefer. Default off; production unaffected. + +import log4js from 'log4js'; + +const logger = log4js.getLogger('engine-flush-defer'); + +let installed = false; + +const SCHEDULED = Symbol('engineFlushScheduled'); + +export const installEngineFlushDeferral = (): void => { + if (installed) return; + + let SocketProto: {sendPacket: (...a: unknown[]) => unknown}; + try { + // eslint-disable-next-line @typescript-eslint/no-require-imports + SocketProto = require('engine.io/build/socket').Socket.prototype; + } catch (err: any) { + logger.warn(`Unable to install engine.io flush deferral (module not found): ${err && err.message || err}`); + return; // Leave `installed` false so a later boot path can retry. + } + if (typeof SocketProto.sendPacket !== 'function') { + logger.warn('engine.io Socket shape unexpected; skipping flush deferral patch'); + return; // Leave `installed` false so a later boot path can retry. + } + // Only after both require and shape check succeed do we record that the + // patch is installed. Setting the flag before validation (the original + // code) would have permanently disabled retries after a transient + // require failure in test/CI environments where socket.io may load late. + installed = true; + + // Re-implementing sendPacket inline rather than wrapping the original + // so the single closing `this.flush()` becomes a microtask-coalesced + // schedule. The body is intentionally a near-verbatim copy of the + // engine.io 6.6.5 implementation so future engine.io upgrades that + // change packet-shape semantics still need re-vetting. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + SocketProto.sendPacket = function (this: any, type: any, data: any, options: any, callback: any) { + if ('function' === typeof options) { + callback = options; + options = {}; + } + if ('closing' === this.readyState || 'closed' === this.readyState) return; + + options = options || {}; + options.compress = options.compress !== false; + const packet: any = {type, options}; + if (data !== undefined) packet.data = data; + this.emit('packetCreate', packet); + this.writeBuffer.push(packet); + if ('function' === typeof callback) this.packetsFn.push(callback); + + if (this[SCHEDULED]) return; + this[SCHEDULED] = true; + queueMicrotask(() => { + this[SCHEDULED] = false; + this.flush(); + }); + }; + + logger.info('engine.io socket flush deferral enabled (#7756 / #7767)'); +}; diff --git a/src/node/utils/Settings.ts b/src/node/utils/Settings.ts index 97413004100..76a687746bd 100644 --- a/src/node/utils/Settings.ts +++ b/src/node/utils/Settings.ts @@ -272,6 +272,7 @@ export type SettingsType = { automaticReconnectionTimeout: number, loadTest: boolean, scalingDiveMetrics: boolean, + engineFlushDefer: boolean, dumpOnUncleanExit: boolean, indentationOnNewLine: boolean, logconfig: any | null, @@ -658,6 +659,19 @@ const settings: SettingsType = { * production deployments aren't paying for instrumentation they don't use. */ scalingDiveMetrics: false, + /** + * Defer engine.io socket flush onto the next microtask so multiple + * sendPacket() calls within the same task accumulate in the writeBuffer + * before drain. Pairs with engine.io's existing transport.send([packets]) + * fast path so a batched send produces fewer WebSocket frames. + * + * Adds no meaningful wall-clock latency — microtasks drain before any + * subsequent macrotask. Backward-compatible at the wire level; existing + * clients receive identical packet bytes. + * + * Default false. Enable only when scoring under the scaling dive. + */ + engineFlushDefer: false, /** * Disable dump of objects preventing a clean exit */