diff --git a/settings.json.template b/settings.json.template index e88e82a36af..87bcc3f9ad5 100644 --- a/settings.json.template +++ b/settings.json.template @@ -733,6 +733,18 @@ */ "loadTest": false, + /* + * Pack multiple NEW_CHANGES revisions into a single NEW_CHANGES_BATCH emit + * per recipient when a fan-out has more than one revision to catch up + * (#7756 lever 3b). Reduces engine.io packet count under high pad + * concurrency, especially on the WebSocket transport. + * + * WARNING: enabling this requires all connected clients to understand + * NEW_CHANGES_BATCH. Old clients will silently fail to apply batched + * revisions. Coordinate the rollout. + */ + "newChangesBatch": false, + /** * Disable dump of objects preventing a clean exit */ diff --git a/src/node/handler/NewChangesPacker.ts b/src/node/handler/NewChangesPacker.ts new file mode 100644 index 00000000000..e952c343930 --- /dev/null +++ b/src/node/handler/NewChangesPacker.ts @@ -0,0 +1,41 @@ +// Wire-format decision for NEW_CHANGES vs NEW_CHANGES_BATCH (#7756 lever 3b). +// +// Lives in its own tiny module rather than inside PadMessageHandler so the +// pure decision can be unit-tested without standing up the full pad / DB / +// socket.io stack. PadMessageHandler.updatePadClients calls this function +// once per recipient with the queued revisions for that recipient. + +export type NewChangesItem = { + newRev: number; + changeset: string; + apool: unknown; + author: string; + currentTime: number; + timeDelta: number; +}; + +export type NewChangesEmit = + | {type: 'COLLABROOM'; data: {type: 'NEW_CHANGES'} & NewChangesItem} + | {type: 'COLLABROOM'; data: {type: 'NEW_CHANGES_BATCH'; changes: NewChangesItem[]}}; + +/** + * Decide what to put on the wire for one recipient. + * - No queued revisions: nothing. + * - Batching disabled, or exactly one rev: emit one NEW_CHANGES per rev + * (legacy behaviour; preserves bytes-on-wire for the steady state). + * - Batching enabled and multiple revs: emit one NEW_CHANGES_BATCH with + * the array of revisions. + */ +export const buildNewChangesEmits = ( + pending: NewChangesItem[], + batchEnabled: boolean, +): NewChangesEmit[] => { + if (pending.length === 0) return []; + if (batchEnabled && pending.length > 1) { + return [{type: 'COLLABROOM', data: {type: 'NEW_CHANGES_BATCH', changes: pending}}]; + } + return pending.map((change) => ({ + type: 'COLLABROOM', + data: {type: 'NEW_CHANGES', ...change}, + } as NewChangesEmit)); +}; diff --git a/src/node/handler/PadMessageHandler.ts b/src/node/handler/PadMessageHandler.ts index 3a27a7ac7be..43a000aa0d8 100644 --- a/src/node/handler/PadMessageHandler.ts +++ b/src/node/handler/PadMessageHandler.ts @@ -48,6 +48,7 @@ const hooks = require('../../static/js/pluginfw/hooks'); const stats = require('../stats') const assert = require('assert').strict; import {recordChangesetApply, recordSocketEmit} from '../prom-instruments'; +import {buildNewChangesEmits, type NewChangesItem} from './NewChangesPacker'; import {RateLimiterMemory} from 'rate-limiter-flexible'; import {ChangesetRequest, PadUserInfo, SocketClientRequest} from "../types/SocketClientRequest"; import {APool, AText, PadAuthor, PadType} from "../types/PadType"; @@ -964,45 +965,76 @@ exports.updatePadClients = async (pad: PadType) => { // but benefit of reusing cached revision object is HUGE const revCache:MapArrayType = {}; + // When `settings.newChangesBatch` is true and a recipient is more than one + // revision behind, pack the queued revisions into a single NEW_CHANGES_BATCH + // emit per recipient. The engine.io WebSocket transport sends one frame per + // packet (the polling transport already batches at the HTTP-response layer), + // so reducing the packet count translates directly into fewer system calls + // on the server and fewer onmessage callbacks on the client. + const batchEnabled = settings.newChangesBatch === true; + await Promise.all(roomSockets.map(async (socket) => { const sessioninfo = sessioninfos[socket.id]; // The user might have disconnected since _getRoomSockets() was called. if (sessioninfo == null) return; - while (sessioninfo.rev < pad.getHeadRevisionNumber()) { - const r = sessioninfo.rev + 1; - let revision = revCache[r]; - if (!revision) { - revision = await pad.getRevision(r); - revCache[r] = revision; - } - - const author = revision.meta.author; - const revChangeset = revision.changeset; - const currentTime = revision.meta.timestamp; + // Snapshot the local state so a concurrent updatePadClients() can't make + // us double-emit. We hold the "I'm responsible for revs (startRev, + // headRev]" claim by reading sessioninfo.rev once and overwriting it + // before any await. A second invocation arriving mid-loop will see the + // bumped rev and skip those revisions; if our emit fails the catch + // below rolls sessioninfo.rev back so they aren't lost. + const startRev = sessioninfo.rev; + const headRev = pad.getHeadRevisionNumber(); + if (startRev >= headRev) return; + const startTime = sessioninfo.time; + // Claim the range immediately so concurrent runs skip it. + sessioninfo.rev = headRev; + + // Collect all queued revisions for this socket. + const pending: Array<{ + newRev: number; + changeset: string; + apool: unknown; + author: string; + currentTime: number; + timeDelta: number; + }> = []; - const forWire = prepareForWire(revChangeset, pad.pool); - const msg = { - type: 'COLLABROOM', - data: { - type: 'NEW_CHANGES', + try { + let previousTime = startTime; + for (let r = startRev + 1; r <= headRev; r++) { + let revision = revCache[r]; + if (!revision) { + revision = await pad.getRevision(r); + revCache[r] = revision; + } + const author = revision.meta.author; + const revChangeset = revision.changeset; + const currentTime = revision.meta.timestamp; + const forWire = prepareForWire(revChangeset, pad.pool); + pending.push({ newRev: r, changeset: forWire.translated, apool: forWire.pool, author, currentTime, - timeDelta: currentTime - sessioninfo.time, - }, - }; - try { - socket.emit('message', msg); - recordSocketEmit('NEW_CHANGES'); - } catch (err:any) { - messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`); - return; + timeDelta: currentTime - previousTime, + }); + previousTime = currentTime; + } + + for (const emit of buildNewChangesEmits(pending, batchEnabled)) { + socket.emit('message', emit); + recordSocketEmit(emit.data.type); } - sessioninfo.time = currentTime; - sessioninfo.rev = r; + // Only after the wire send succeeds do we commit the new time. + sessioninfo.time = previousTime; + } catch (err: any) { + // Roll back the claim so the next updatePadClients retries these revs. + // Only set rev back if no one else has advanced past us in the meantime. + if (sessioninfo.rev === headRev) sessioninfo.rev = startRev; + messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`); } })); }; diff --git a/src/node/prom-instruments.ts b/src/node/prom-instruments.ts index ebb54018d2d..fd998c45f12 100644 --- a/src/node/prom-instruments.ts +++ b/src/node/prom-instruments.ts @@ -40,6 +40,7 @@ export const padUsersGauge = new client.Gauge({ // state. const KNOWN_TYPES = new Set([ 'NEW_CHANGES', + 'NEW_CHANGES_BATCH', 'ACCEPT_COMMIT', 'CHAT_MESSAGE', 'CLIENT_VARS', diff --git a/src/node/utils/Settings.ts b/src/node/utils/Settings.ts index 97413004100..f74eeecdb4e 100644 --- a/src/node/utils/Settings.ts +++ b/src/node/utils/Settings.ts @@ -272,6 +272,7 @@ export type SettingsType = { automaticReconnectionTimeout: number, loadTest: boolean, scalingDiveMetrics: boolean, + newChangesBatch: boolean, dumpOnUncleanExit: boolean, indentationOnNewLine: boolean, logconfig: any | null, @@ -658,6 +659,21 @@ const settings: SettingsType = { * production deployments aren't paying for instrumentation they don't use. */ scalingDiveMetrics: false, + /** + * Pack multiple NEW_CHANGES revisions into a single NEW_CHANGES_BATCH emit + * per recipient when a fan-out has more than one revision to catch up + * (#7756 lever 3b). Reduces engine.io packet count under high pad + * concurrency, especially on the WebSocket transport, which sends one + * frame per packet (the polling transport already batches naturally). + * + * Requires clients to recognise the NEW_CHANGES_BATCH message type. New + * clients are forward-compatible (they handle both NEW_CHANGES and + * NEW_CHANGES_BATCH). Old clients connecting to a server with this enabled + * would fail to apply batched revisions, so coordinate the rollout. + * + * 0/false (default) preserves legacy per-revision emit behaviour. + */ + newChangesBatch: false, /** * Disable dump of objects preventing a clean exit */ diff --git a/src/static/js/broadcast.ts b/src/static/js/broadcast.ts index 8551f1d0cfa..01262e49a3e 100644 --- a/src/static/js/broadcast.ts +++ b/src/static/js/broadcast.ts @@ -493,17 +493,23 @@ const loadBroadcastJS = (socket, sendSocketMsg, fireWhenAllScriptsAreLoaded, Bro if (obj.type === 'COLLABROOM') { obj = obj.data; - if (obj.type === 'NEW_CHANGES') { - const changeset = moveOpsToNewPool( - obj.changeset, (new AttribPool()).fromJsonable(obj.apool), padContents.apool); - - let changesetBack = inverse( - obj.changeset, padContents.currentLines, padContents.alines, padContents.apool); - - changesetBack = moveOpsToNewPool( - changesetBack, (new AttribPool()).fromJsonable(obj.apool), padContents.apool); - - loadedNewChangeset(changeset, changesetBack, obj.newRev - 1, obj.timeDelta); + if (obj.type === 'NEW_CHANGES' || obj.type === 'NEW_CHANGES_BATCH') { + // NEW_CHANGES_BATCH (#7756 lever 3b) carries an array of revisions + // in one emit. Each revision has the same shape as the legacy + // single-rev message; apply in order. + const changes = obj.type === 'NEW_CHANGES_BATCH' ? obj.changes : [obj]; + for (const change of changes) { + const changeset = moveOpsToNewPool( + change.changeset, (new AttribPool()).fromJsonable(change.apool), padContents.apool); + + let changesetBack = inverse( + change.changeset, padContents.currentLines, padContents.alines, padContents.apool); + + changesetBack = moveOpsToNewPool( + changesetBack, (new AttribPool()).fromJsonable(change.apool), padContents.apool); + + loadedNewChangeset(changeset, changesetBack, change.newRev - 1, change.timeDelta); + } } else if (obj.type === 'NEW_AUTHORDATA') { const authorMap = {}; authorMap[obj.author] = obj.data; diff --git a/src/static/js/collab_client.ts b/src/static/js/collab_client.ts index c90f92e80d3..67228095921 100644 --- a/src/static/js/collab_client.ts +++ b/src/static/js/collab_client.ts @@ -188,7 +188,12 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad) if (wrapper.type !== 'COLLABROOM' && wrapper.type !== 'CUSTOM') return; const msg = wrapper.data; - if (msg.type === 'NEW_CHANGES') { + if (msg.type === 'NEW_CHANGES' || msg.type === 'NEW_CHANGES_BATCH') { + // NEW_CHANGES_BATCH (added in #7756 lever 3b) carries an array of + // revisions in one emit. Each revision has the same shape as the + // legacy single-rev message, so we normalise to a list and apply in + // order, sharing the same compose-safety await. + const changes = msg.type === 'NEW_CHANGES_BATCH' ? msg.changes : [msg]; serverMessageTaskQueue.enqueue(async () => { // Avoid updating the DOM while the user is composing a character. Notes about this `await`: // * `await null;` is equivalent to `await Promise.resolve(null);`, so if the user is not @@ -198,14 +203,16 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad) // possible, that the chances are so small or the consequences so minor that it's not // worth addressing). await editor.getInInternationalComposition(); - const {newRev, changeset, author = '', apool} = msg; - if (newRev !== (rev + 1)) { - window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${rev + 1}`); - // setChannelState("DISCONNECTED", "badmessage_newchanges"); - return; + for (const change of changes) { + const {newRev, changeset, author = '', apool} = change; + if (newRev !== (rev + 1)) { + window.console.warn(`bad message revision on ${msg.type}: ${newRev} not ${rev + 1}`); + // setChannelState("DISCONNECTED", "badmessage_newchanges"); + return; + } + rev = newRev; + editor.applyChangesToBase(changeset, author, apool); } - rev = newRev; - editor.applyChangesToBase(changeset, author, apool); }); } else if (msg.type === 'ACCEPT_COMMIT') { serverMessageTaskQueue.enqueue(() => { diff --git a/src/static/js/types/SocketIOMessage.ts b/src/static/js/types/SocketIOMessage.ts index 914c5c4721a..922363ae68a 100644 --- a/src/static/js/types/SocketIOMessage.ts +++ b/src/static/js/types/SocketIOMessage.ts @@ -128,6 +128,20 @@ export type ClientNewChanges = { payload?: ClientNewChanges } +export type NewChangesItem = { + apool: AttributePool, + author: string, + changeset: string, + currentTime: number, + newRev: number, + timeDelta: number, +} + +export type ClientNewChangesBatch = { + type: 'NEW_CHANGES_BATCH', + changes: NewChangesItem[], +} + export type ClientAcceptCommitMessage = { type: 'ACCEPT_COMMIT' newRev: number diff --git a/src/tests/backend-new/specs/new-changes-batch.test.ts b/src/tests/backend-new/specs/new-changes-batch.test.ts new file mode 100644 index 00000000000..b9fac90a0ff --- /dev/null +++ b/src/tests/backend-new/specs/new-changes-batch.test.ts @@ -0,0 +1,48 @@ +// Regression test for the NEW_CHANGES_BATCH wire-format decision +// (#7756 lever 3b). Imports the real implementation from +// PadMessageHandler so removing or breaking the production batching +// logic fails this test. + +import {describe, it, expect, beforeEach, afterEach} from 'vitest'; +import settings from '../../../node/utils/Settings'; +import {buildNewChangesEmits, type NewChangesItem} from '../../../node/handler/NewChangesPacker'; + +const ORIGINAL_FLAG = settings.newChangesBatch; + +beforeEach(() => { settings.newChangesBatch = false; }); +afterEach(() => { settings.newChangesBatch = ORIGINAL_FLAG; }); + +const fakePending = (n: number): NewChangesItem[] => + Array.from({length: n}, (_, i) => ({ + newRev: i + 1, changeset: `=${i}`, apool: {}, author: 'a.1', + currentTime: 1_000 * (i + 1), timeDelta: 1_000, + })); + +describe('buildNewChangesEmits', () => { + it('flag OFF: one NEW_CHANGES per rev regardless of count', () => { + const emits = buildNewChangesEmits(fakePending(5), false); + expect(emits).toHaveLength(5); + expect(emits.every((e) => e.data.type === 'NEW_CHANGES')).toBe(true); + }); + + it('flag ON, single rev: still NEW_CHANGES (no batch overhead for the steady state)', () => { + const emits = buildNewChangesEmits(fakePending(1), true); + expect(emits).toHaveLength(1); + expect(emits[0]!.data.type).toBe('NEW_CHANGES'); + }); + + it('flag ON, multiple revs: a single NEW_CHANGES_BATCH carrying all of them', () => { + const emits = buildNewChangesEmits(fakePending(5), true); + expect(emits).toHaveLength(1); + expect(emits[0]!.data.type).toBe('NEW_CHANGES_BATCH'); + const batch = emits[0]!.data as {type: 'NEW_CHANGES_BATCH'; changes: NewChangesItem[]}; + expect(batch.changes).toHaveLength(5); + expect(batch.changes[0]!.newRev).toBe(1); + expect(batch.changes[4]!.newRev).toBe(5); + }); + + it('empty pending list emits nothing', () => { + expect(buildNewChangesEmits([], true)).toEqual([]); + expect(buildNewChangesEmits([], false)).toEqual([]); + }); +});