Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions settings.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,19 @@
*/
"loadTest": false,

/*
* Defer engine.io socket flush onto the next microtask so multiple
* sendPacket() calls in the same task accumulate in writeBuffer before
* the underlying transport.send drains. Pairs with engine.io's existing
* batched-send path so high-fan-out scenarios produce fewer WebSocket
* frames. Microtask deferral adds no meaningful wall-clock latency —
* microtasks drain before any subsequent macrotask. Wire bytes are
* unchanged.
*
* #7756 / #7767. Default off; production unaffected.
*/
"engineFlushDefer": false,

/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
8 changes: 8 additions & 0 deletions src/node/hooks/express/socketio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ const socketSessionMiddleware = (args: any) => (socket: any, next: Function) =>
};

export const expressCreateServer = (hookName:string, args:ArgsExpressType, cb:Function) => {
// Engine.io socket flush deferral (#7756 / #7767). Apply BEFORE building
// the socket.io Server so the patched Socket prototype is in effect when
// the Server creates its engine.
if (settings.engineFlushDefer === true) {
// eslint-disable-next-line @typescript-eslint/no-require-imports
require('../../utils/EngineFlushDeferral').installEngineFlushDeferral();
}

// init socket.io and redirect all requests to the MessageHandler
// there shouldn't be a browser that isn't compatible to all
// transports in this list at once
Expand Down
99 changes: 99 additions & 0 deletions src/node/utils/EngineFlushDeferral.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Engine.io socket flush deferral — #7756 / #7767 deeper investigation
// after the simple WS transport-level packing prototype (#7772) showed
// that the writeBuffer almost never accumulates because flush() drains
// immediately on `transport.writable === true`.
//
// engine.io's Socket.sendPacket(...) ends with:
//
// this.writeBuffer.push(packet);
// if (callback) this.packetsFn.push(callback);
// this.flush(); // <-- synchronous
//
// flush() reads writeBuffer and hands it to transport.send. For
// WebSocket, transport.writable is true again within microseconds of
// each write, so each sendPacket() call drains a buffer of size 1. The
// transport.send([packets]) function then iterates packets and writes
// one WS frame per packet — which is what the polling transport's
// natural encodePayload batching avoids.
//
// This patch coalesces synchronous-task sendPacket calls onto a single
// microtask-scheduled flush. Inside the same JS task, multiple
// sendPacket() calls accumulate in writeBuffer; the queued microtask
// then calls flush() once with the whole batch. The transport's
// send([batch]) sees N > 1 packets and the WS payload-encoding fast
// path (also added by lever 8) coalesces them into one frame.
//
// Microtask deferral adds zero meaningful wall-clock latency:
// microtasks drain before the next macrotask, so any consumer waiting
// on the next setImmediate / setTimeout / I/O callback still sees the
// flush completed.
//
// Forward-compatible. Existing clients receive identical wire bytes
// because the engine.io packet encoding is unchanged; the difference
// is only how many engine.io packets share one transport-level send
// call. The WS transport's send([packets]) path is then where lever 8
// (or this patch's accompanying engine-packing branch) decides
// whether to ship them as N frames or one payload-encoded frame.
//
// Gated by settings.engineFlushDefer. Default off; production unaffected.

import log4js from 'log4js';

const logger = log4js.getLogger('engine-flush-defer');

let installed = false;

const SCHEDULED = Symbol('engineFlushScheduled');

export const installEngineFlushDeferral = (): void => {
if (installed) return;

let SocketProto: {sendPacket: (...a: unknown[]) => unknown};
try {
// eslint-disable-next-line @typescript-eslint/no-require-imports
SocketProto = require('engine.io/build/socket').Socket.prototype;
} catch (err: any) {
logger.warn(`Unable to install engine.io flush deferral (module not found): ${err && err.message || err}`);
return; // Leave `installed` false so a later boot path can retry.
}
if (typeof SocketProto.sendPacket !== 'function') {
logger.warn('engine.io Socket shape unexpected; skipping flush deferral patch');
return; // Leave `installed` false so a later boot path can retry.
}
// Only after both require and shape check succeed do we record that the
// patch is installed. Setting the flag before validation (the original
// code) would have permanently disabled retries after a transient
// require failure in test/CI environments where socket.io may load late.
installed = true;

// Re-implementing sendPacket inline rather than wrapping the original
// so the single closing `this.flush()` becomes a microtask-coalesced
// schedule. The body is intentionally a near-verbatim copy of the
// engine.io 6.6.5 implementation so future engine.io upgrades that
// change packet-shape semantics still need re-vetting.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
SocketProto.sendPacket = function (this: any, type: any, data: any, options: any, callback: any) {
if ('function' === typeof options) {
callback = options;
options = {};
}
if ('closing' === this.readyState || 'closed' === this.readyState) return;

options = options || {};
options.compress = options.compress !== false;
const packet: any = {type, options};
if (data !== undefined) packet.data = data;
this.emit('packetCreate', packet);
this.writeBuffer.push(packet);
if ('function' === typeof callback) this.packetsFn.push(callback);

if (this[SCHEDULED]) return;
this[SCHEDULED] = true;
queueMicrotask(() => {
this[SCHEDULED] = false;
this.flush();
});
};

logger.info('engine.io socket flush deferral enabled (#7756 / #7767)');
};
14 changes: 14 additions & 0 deletions src/node/utils/Settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ export type SettingsType = {
automaticReconnectionTimeout: number,
loadTest: boolean,
scalingDiveMetrics: boolean,
engineFlushDefer: boolean,
dumpOnUncleanExit: boolean,
indentationOnNewLine: boolean,
logconfig: any | null,
Expand Down Expand Up @@ -658,6 +659,19 @@ const settings: SettingsType = {
* production deployments aren't paying for instrumentation they don't use.
*/
scalingDiveMetrics: false,
/**
* Defer engine.io socket flush onto the next microtask so multiple
* sendPacket() calls within the same task accumulate in the writeBuffer
* before drain. Pairs with engine.io's existing transport.send([packets])
* fast path so a batched send produces fewer WebSocket frames.
*
* Adds no meaningful wall-clock latency — microtasks drain before any
* subsequent macrotask. Backward-compatible at the wire level; existing
* clients receive identical packet bytes.
*
* Default false. Enable only when scoring under the scaling dive.
*/
engineFlushDefer: false,
Comment thread
qodo-free-for-open-source-projects[bot] marked this conversation as resolved.
/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
Loading