-
-
Notifications
You must be signed in to change notification settings - Fork 3k
feat(scaling): engine.io socket flush deferral — modest tail-latency reduction at mid-load #7774
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
JohnMcLear
wants to merge
2
commits into
develop
Choose a base branch
from
feat/engine-io-flush-deferral
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+134
−0
Draft
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| // Engine.io socket flush deferral — #7756 / #7767 deeper investigation | ||
| // after the simple WS transport-level packing prototype (#7772) showed | ||
| // that the writeBuffer almost never accumulates because flush() drains | ||
| // immediately on `transport.writable === true`. | ||
| // | ||
| // engine.io's Socket.sendPacket(...) ends with: | ||
| // | ||
| // this.writeBuffer.push(packet); | ||
| // if (callback) this.packetsFn.push(callback); | ||
| // this.flush(); // <-- synchronous | ||
| // | ||
| // flush() reads writeBuffer and hands it to transport.send. For | ||
| // WebSocket, transport.writable is true again within microseconds of | ||
| // each write, so each sendPacket() call drains a buffer of size 1. The | ||
| // transport.send([packets]) function then iterates packets and writes | ||
| // one WS frame per packet — which is what the polling transport's | ||
| // natural encodePayload batching avoids. | ||
| // | ||
| // This patch coalesces synchronous-task sendPacket calls onto a single | ||
| // microtask-scheduled flush. Inside the same JS task, multiple | ||
| // sendPacket() calls accumulate in writeBuffer; the queued microtask | ||
| // then calls flush() once with the whole batch. The transport's | ||
| // send([batch]) sees N > 1 packets and the WS payload-encoding fast | ||
| // path (also added by lever 8) coalesces them into one frame. | ||
| // | ||
| // Microtask deferral adds zero meaningful wall-clock latency: | ||
| // microtasks drain before the next macrotask, so any consumer waiting | ||
| // on the next setImmediate / setTimeout / I/O callback still sees the | ||
| // flush completed. | ||
| // | ||
| // Forward-compatible. Existing clients receive identical wire bytes | ||
| // because the engine.io packet encoding is unchanged; the difference | ||
| // is only how many engine.io packets share one transport-level send | ||
| // call. The WS transport's send([packets]) path is then where lever 8 | ||
| // (or this patch's accompanying engine-packing branch) decides | ||
| // whether to ship them as N frames or one payload-encoded frame. | ||
| // | ||
| // Gated by settings.engineFlushDefer. Default off; production unaffected. | ||
|
|
||
| import log4js from 'log4js'; | ||
|
|
||
| const logger = log4js.getLogger('engine-flush-defer'); | ||
|
|
||
| let installed = false; | ||
|
|
||
| const SCHEDULED = Symbol('engineFlushScheduled'); | ||
|
|
||
| export const installEngineFlushDeferral = (): void => { | ||
| if (installed) return; | ||
|
|
||
| let SocketProto: {sendPacket: (...a: unknown[]) => unknown}; | ||
| try { | ||
| // eslint-disable-next-line @typescript-eslint/no-require-imports | ||
| SocketProto = require('engine.io/build/socket').Socket.prototype; | ||
| } catch (err: any) { | ||
| logger.warn(`Unable to install engine.io flush deferral (module not found): ${err && err.message || err}`); | ||
| return; // Leave `installed` false so a later boot path can retry. | ||
| } | ||
| if (typeof SocketProto.sendPacket !== 'function') { | ||
| logger.warn('engine.io Socket shape unexpected; skipping flush deferral patch'); | ||
| return; // Leave `installed` false so a later boot path can retry. | ||
| } | ||
| // Only after both require and shape check succeed do we record that the | ||
| // patch is installed. Setting the flag before validation (the original | ||
| // code) would have permanently disabled retries after a transient | ||
| // require failure in test/CI environments where socket.io may load late. | ||
| installed = true; | ||
|
|
||
| // Re-implementing sendPacket inline rather than wrapping the original | ||
| // so the single closing `this.flush()` becomes a microtask-coalesced | ||
| // schedule. The body is intentionally a near-verbatim copy of the | ||
| // engine.io 6.6.5 implementation so future engine.io upgrades that | ||
| // change packet-shape semantics still need re-vetting. | ||
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
| SocketProto.sendPacket = function (this: any, type: any, data: any, options: any, callback: any) { | ||
| if ('function' === typeof options) { | ||
| callback = options; | ||
| options = {}; | ||
| } | ||
| if ('closing' === this.readyState || 'closed' === this.readyState) return; | ||
|
|
||
| options = options || {}; | ||
| options.compress = options.compress !== false; | ||
| const packet: any = {type, options}; | ||
| if (data !== undefined) packet.data = data; | ||
| this.emit('packetCreate', packet); | ||
| this.writeBuffer.push(packet); | ||
| if ('function' === typeof callback) this.packetsFn.push(callback); | ||
|
|
||
| if (this[SCHEDULED]) return; | ||
| this[SCHEDULED] = true; | ||
| queueMicrotask(() => { | ||
| this[SCHEDULED] = false; | ||
| this.flush(); | ||
| }); | ||
| }; | ||
|
|
||
| logger.info('engine.io socket flush deferral enabled (#7756 / #7767)'); | ||
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.