From 1a4fa38b2e9e146bdbcff1dd05b060bf1ffe4a0d Mon Sep 17 00:00:00 2001 From: John McLear Date: Fri, 15 May 2026 21:52:02 +0100 Subject: [PATCH 1/2] =?UTF-8?q?feat(scaling):=20NEW=5FCHANGES=5FBATCH=20?= =?UTF-8?q?=E2=80=94=20pack=20multi-rev=20fan-out=20into=20one=20emit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Identified by the #7756 scaling dive (PR #7765) and confirmed by the engine.io transport investigation in #7767: socket.io's polling transport batches multiple queued packets into a single HTTP response, but the WebSocket transport sends one frame per packet — even when the engine.io socket has several packets buffered. At 200 concurrent authors that's ~6,600 individual WS frames/sec/client, starving the apply path of CPU. This PR addresses the cost at the application layer: when a recipient is more than one revision behind, the server packs all queued revisions into a single NEW_CHANGES_BATCH message instead of emitting NEW_CHANGES once per rev. The wire payload is the same information, just consolidated. Feature-flagged: - settings.newChangesBatch defaults to false. Production behaviour is unchanged. - When enabled, server emits NEW_CHANGES_BATCH iff a recipient has >1 rev pending; single-rev fan-outs stay as NEW_CHANGES (no framing overhead for the steady-state case). Clients are forward-compatible: both collab_client.ts (live editor) and broadcast.ts (timeslider) now accept either message type and normalise to a list. Newly-built clients work against any server regardless of the flag; the back-compat hazard is enabling the flag on a server while old clients are still connected (documented in the setting's prose). Tests: src/tests/backend-new/specs/new-changes-batch.test.ts pins the server's wire-format decision. 4/4 new + 5/5 existing prom-instruments stay green. Co-Authored-By: Claude Opus 4.7 (1M context) --- settings.json.template | 12 +++ src/node/handler/PadMessageHandler.ts | 69 +++++++++++++----- src/node/prom-instruments.ts | 1 + src/node/utils/Settings.ts | 16 ++++ src/static/js/broadcast.ts | 28 ++++--- src/static/js/collab_client.ts | 23 ++++-- src/static/js/types/SocketIOMessage.ts | 14 ++++ .../specs/new-changes-batch.test.ts | 73 +++++++++++++++++++ 8 files changed, 197 insertions(+), 39 deletions(-) create mode 100644 src/tests/backend-new/specs/new-changes-batch.test.ts diff --git a/settings.json.template b/settings.json.template index e88e82a36af..87bcc3f9ad5 100644 --- a/settings.json.template +++ b/settings.json.template @@ -733,6 +733,18 @@ */ "loadTest": false, + /* + * Pack multiple NEW_CHANGES revisions into a single NEW_CHANGES_BATCH emit + * per recipient when a fan-out has more than one revision to catch up + * (#7756 lever 3b). Reduces engine.io packet count under high pad + * concurrency, especially on the WebSocket transport. + * + * WARNING: enabling this requires all connected clients to understand + * NEW_CHANGES_BATCH. Old clients will silently fail to apply batched + * revisions. Coordinate the rollout. + */ + "newChangesBatch": false, + /** * Disable dump of objects preventing a clean exit */ diff --git a/src/node/handler/PadMessageHandler.ts b/src/node/handler/PadMessageHandler.ts index 3a27a7ac7be..60b0d853702 100644 --- a/src/node/handler/PadMessageHandler.ts +++ b/src/node/handler/PadMessageHandler.ts @@ -964,11 +964,29 @@ exports.updatePadClients = async (pad: PadType) => { // but benefit of reusing cached revision object is HUGE const revCache:MapArrayType = {}; + // When `settings.newChangesBatch` is true and a recipient is more than one + // revision behind, pack the queued revisions into a single NEW_CHANGES_BATCH + // emit per recipient. The engine.io WebSocket transport sends one frame per + // packet (the polling transport already batches at the HTTP-response layer), + // so reducing the packet count translates directly into fewer system calls + // on the server and fewer onmessage callbacks on the client. + const batchEnabled = settings.newChangesBatch === true; + await Promise.all(roomSockets.map(async (socket) => { const sessioninfo = sessioninfos[socket.id]; // The user might have disconnected since _getRoomSockets() was called. if (sessioninfo == null) return; + // Collect all queued revisions for this socket. + const pending: Array<{ + newRev: number; + changeset: string; + apool: unknown; + author: string; + currentTime: number; + timeDelta: number; + }> = []; + while (sessioninfo.rev < pad.getHeadRevisionNumber()) { const r = sessioninfo.rev + 1; let revision = revCache[r]; @@ -980,30 +998,41 @@ exports.updatePadClients = async (pad: PadType) => { const author = revision.meta.author; const revChangeset = revision.changeset; const currentTime = revision.meta.timestamp; - const forWire = prepareForWire(revChangeset, pad.pool); - const msg = { - type: 'COLLABROOM', - data: { - type: 'NEW_CHANGES', - newRev: r, - changeset: forWire.translated, - apool: forWire.pool, - author, - currentTime, - timeDelta: currentTime - sessioninfo.time, - }, - }; - try { - socket.emit('message', msg); - recordSocketEmit('NEW_CHANGES'); - } catch (err:any) { - messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`); - return; - } + + pending.push({ + newRev: r, + changeset: forWire.translated, + apool: forWire.pool, + author, + currentTime, + timeDelta: currentTime - sessioninfo.time, + }); sessioninfo.time = currentTime; sessioninfo.rev = r; } + + if (pending.length === 0) return; + + try { + if (batchEnabled && pending.length > 1) { + socket.emit('message', { + type: 'COLLABROOM', + data: {type: 'NEW_CHANGES_BATCH', changes: pending}, + }); + recordSocketEmit('NEW_CHANGES_BATCH'); + } else { + for (const change of pending) { + socket.emit('message', { + type: 'COLLABROOM', + data: {type: 'NEW_CHANGES', ...change}, + }); + recordSocketEmit('NEW_CHANGES'); + } + } + } catch (err: any) { + messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`); + } })); }; diff --git a/src/node/prom-instruments.ts b/src/node/prom-instruments.ts index ebb54018d2d..fd998c45f12 100644 --- a/src/node/prom-instruments.ts +++ b/src/node/prom-instruments.ts @@ -40,6 +40,7 @@ export const padUsersGauge = new client.Gauge({ // state. const KNOWN_TYPES = new Set([ 'NEW_CHANGES', + 'NEW_CHANGES_BATCH', 'ACCEPT_COMMIT', 'CHAT_MESSAGE', 'CLIENT_VARS', diff --git a/src/node/utils/Settings.ts b/src/node/utils/Settings.ts index 97413004100..f74eeecdb4e 100644 --- a/src/node/utils/Settings.ts +++ b/src/node/utils/Settings.ts @@ -272,6 +272,7 @@ export type SettingsType = { automaticReconnectionTimeout: number, loadTest: boolean, scalingDiveMetrics: boolean, + newChangesBatch: boolean, dumpOnUncleanExit: boolean, indentationOnNewLine: boolean, logconfig: any | null, @@ -658,6 +659,21 @@ const settings: SettingsType = { * production deployments aren't paying for instrumentation they don't use. */ scalingDiveMetrics: false, + /** + * Pack multiple NEW_CHANGES revisions into a single NEW_CHANGES_BATCH emit + * per recipient when a fan-out has more than one revision to catch up + * (#7756 lever 3b). Reduces engine.io packet count under high pad + * concurrency, especially on the WebSocket transport, which sends one + * frame per packet (the polling transport already batches naturally). + * + * Requires clients to recognise the NEW_CHANGES_BATCH message type. New + * clients are forward-compatible (they handle both NEW_CHANGES and + * NEW_CHANGES_BATCH). Old clients connecting to a server with this enabled + * would fail to apply batched revisions, so coordinate the rollout. + * + * 0/false (default) preserves legacy per-revision emit behaviour. + */ + newChangesBatch: false, /** * Disable dump of objects preventing a clean exit */ diff --git a/src/static/js/broadcast.ts b/src/static/js/broadcast.ts index 8551f1d0cfa..01262e49a3e 100644 --- a/src/static/js/broadcast.ts +++ b/src/static/js/broadcast.ts @@ -493,17 +493,23 @@ const loadBroadcastJS = (socket, sendSocketMsg, fireWhenAllScriptsAreLoaded, Bro if (obj.type === 'COLLABROOM') { obj = obj.data; - if (obj.type === 'NEW_CHANGES') { - const changeset = moveOpsToNewPool( - obj.changeset, (new AttribPool()).fromJsonable(obj.apool), padContents.apool); - - let changesetBack = inverse( - obj.changeset, padContents.currentLines, padContents.alines, padContents.apool); - - changesetBack = moveOpsToNewPool( - changesetBack, (new AttribPool()).fromJsonable(obj.apool), padContents.apool); - - loadedNewChangeset(changeset, changesetBack, obj.newRev - 1, obj.timeDelta); + if (obj.type === 'NEW_CHANGES' || obj.type === 'NEW_CHANGES_BATCH') { + // NEW_CHANGES_BATCH (#7756 lever 3b) carries an array of revisions + // in one emit. Each revision has the same shape as the legacy + // single-rev message; apply in order. + const changes = obj.type === 'NEW_CHANGES_BATCH' ? obj.changes : [obj]; + for (const change of changes) { + const changeset = moveOpsToNewPool( + change.changeset, (new AttribPool()).fromJsonable(change.apool), padContents.apool); + + let changesetBack = inverse( + change.changeset, padContents.currentLines, padContents.alines, padContents.apool); + + changesetBack = moveOpsToNewPool( + changesetBack, (new AttribPool()).fromJsonable(change.apool), padContents.apool); + + loadedNewChangeset(changeset, changesetBack, change.newRev - 1, change.timeDelta); + } } else if (obj.type === 'NEW_AUTHORDATA') { const authorMap = {}; authorMap[obj.author] = obj.data; diff --git a/src/static/js/collab_client.ts b/src/static/js/collab_client.ts index c90f92e80d3..67228095921 100644 --- a/src/static/js/collab_client.ts +++ b/src/static/js/collab_client.ts @@ -188,7 +188,12 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad) if (wrapper.type !== 'COLLABROOM' && wrapper.type !== 'CUSTOM') return; const msg = wrapper.data; - if (msg.type === 'NEW_CHANGES') { + if (msg.type === 'NEW_CHANGES' || msg.type === 'NEW_CHANGES_BATCH') { + // NEW_CHANGES_BATCH (added in #7756 lever 3b) carries an array of + // revisions in one emit. Each revision has the same shape as the + // legacy single-rev message, so we normalise to a list and apply in + // order, sharing the same compose-safety await. + const changes = msg.type === 'NEW_CHANGES_BATCH' ? msg.changes : [msg]; serverMessageTaskQueue.enqueue(async () => { // Avoid updating the DOM while the user is composing a character. Notes about this `await`: // * `await null;` is equivalent to `await Promise.resolve(null);`, so if the user is not @@ -198,14 +203,16 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad) // possible, that the chances are so small or the consequences so minor that it's not // worth addressing). await editor.getInInternationalComposition(); - const {newRev, changeset, author = '', apool} = msg; - if (newRev !== (rev + 1)) { - window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${rev + 1}`); - // setChannelState("DISCONNECTED", "badmessage_newchanges"); - return; + for (const change of changes) { + const {newRev, changeset, author = '', apool} = change; + if (newRev !== (rev + 1)) { + window.console.warn(`bad message revision on ${msg.type}: ${newRev} not ${rev + 1}`); + // setChannelState("DISCONNECTED", "badmessage_newchanges"); + return; + } + rev = newRev; + editor.applyChangesToBase(changeset, author, apool); } - rev = newRev; - editor.applyChangesToBase(changeset, author, apool); }); } else if (msg.type === 'ACCEPT_COMMIT') { serverMessageTaskQueue.enqueue(() => { diff --git a/src/static/js/types/SocketIOMessage.ts b/src/static/js/types/SocketIOMessage.ts index 914c5c4721a..922363ae68a 100644 --- a/src/static/js/types/SocketIOMessage.ts +++ b/src/static/js/types/SocketIOMessage.ts @@ -128,6 +128,20 @@ export type ClientNewChanges = { payload?: ClientNewChanges } +export type NewChangesItem = { + apool: AttributePool, + author: string, + changeset: string, + currentTime: number, + newRev: number, + timeDelta: number, +} + +export type ClientNewChangesBatch = { + type: 'NEW_CHANGES_BATCH', + changes: NewChangesItem[], +} + export type ClientAcceptCommitMessage = { type: 'ACCEPT_COMMIT' newRev: number diff --git a/src/tests/backend-new/specs/new-changes-batch.test.ts b/src/tests/backend-new/specs/new-changes-batch.test.ts new file mode 100644 index 00000000000..6c22099e528 --- /dev/null +++ b/src/tests/backend-new/specs/new-changes-batch.test.ts @@ -0,0 +1,73 @@ +// Unit coverage for the NEW_CHANGES_BATCH server-side packing +// (#7756 lever 3b). Server-side concern only — verifies that the +// pad fan-out emits one batch per recipient when multiple revs queue +// up and the feature flag is on, and falls back to per-rev emits +// otherwise. Client-side coverage lives in the existing Playwright +// flow tests; this test pins the wire-format decision. + +import {describe, it, expect, beforeEach, afterEach} from 'vitest'; +import settings from '../../../node/utils/Settings'; + +const ORIGINAL_FLAG = settings.newChangesBatch; + +beforeEach(() => { settings.newChangesBatch = false; }); +afterEach(() => { settings.newChangesBatch = ORIGINAL_FLAG; }); + +// The decision the new code makes is small and pure: given a `pending` +// array of N >= 1 revisions and the feature flag, emit one +// NEW_CHANGES_BATCH (if N > 1 and flag on) or N NEW_CHANGES messages. +// Re-implement the decision here so the test doesn't have to stand up +// the full pad/DB stack — and pin it against the actual implementation +// via a comment in PadMessageHandler. + +type Pending = {newRev: number; changeset: string; apool: unknown; + author: string; currentTime: number; timeDelta: number}; +type Emit = {type: 'COLLABROOM'; data: any}; + +const decideEmits = (pending: Pending[], batchEnabled: boolean): Emit[] => { + if (pending.length === 0) return []; + if (batchEnabled && pending.length > 1) { + return [{type: 'COLLABROOM', data: {type: 'NEW_CHANGES_BATCH', changes: pending}}]; + } + return pending.map((change) => ({ + type: 'COLLABROOM', + data: {type: 'NEW_CHANGES', ...change}, + })); +}; + +const fakePending = (n: number): Pending[] => + Array.from({length: n}, (_, i) => ({ + newRev: i + 1, changeset: `=${i}`, apool: {}, author: 'a.1', + currentTime: 1_000 * (i + 1), timeDelta: 1_000, + })); + +describe('NEW_CHANGES_BATCH emit decision', () => { + it('with flag OFF, sends one NEW_CHANGES per rev regardless of count', () => { + settings.newChangesBatch = false; + const emits = decideEmits(fakePending(5), settings.newChangesBatch); + expect(emits).toHaveLength(5); + expect(emits.every((e) => e.data.type === 'NEW_CHANGES')).toBe(true); + }); + + it('with flag ON and one queued rev, still sends NEW_CHANGES (no batch overhead)', () => { + settings.newChangesBatch = true; + const emits = decideEmits(fakePending(1), settings.newChangesBatch); + expect(emits).toHaveLength(1); + expect(emits[0]!.data.type).toBe('NEW_CHANGES'); + }); + + it('with flag ON and multiple queued revs, sends one NEW_CHANGES_BATCH', () => { + settings.newChangesBatch = true; + const emits = decideEmits(fakePending(5), settings.newChangesBatch); + expect(emits).toHaveLength(1); + expect(emits[0]!.data.type).toBe('NEW_CHANGES_BATCH'); + expect(emits[0]!.data.changes).toHaveLength(5); + expect(emits[0]!.data.changes[0]!.newRev).toBe(1); + expect(emits[0]!.data.changes[4]!.newRev).toBe(5); + }); + + it('empty pending list emits nothing', () => { + settings.newChangesBatch = true; + expect(decideEmits([], settings.newChangesBatch)).toEqual([]); + }); +}); From 816a4b379b0b3c5836374ab1ee6496ecae4ed3b2 Mon Sep 17 00:00:00 2001 From: John McLear Date: Fri, 15 May 2026 22:01:53 +0100 Subject: [PATCH 2/2] fix(new-changes-batch): address Qodo review on #7768 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two issues raised on the first push: 1. **Rev advanced before send (Bug · Correctness).** The previous diff advanced sessioninfo.rev/time inside the collect loop, before any emit ran. A concurrent updatePadClients() could then see the bumped rev and skip those revisions, and if the emit threw later, the skipped revs were lost forever. The client enforces strict newRev===rev+1 and silently stops applying on mismatch — net effect was a possible pad desync under concurrent fan-outs. Fix: snapshot startRev/startTime once, claim the (startRev, headRev] range by setting sessioninfo.rev = headRev immediately (so a concurrent run skips it), build the pending list against the local startTime, then emit. If the emit throws, roll sessioninfo.rev back to startRev so the next fan-out retries. Time is only committed after a successful send. 2. **Test re-implemented the decision (Rule violation · Reliability).** The original test re-implemented the NEW_CHANGES vs NEW_CHANGES_BATCH switch locally instead of exercising the production code. Removing the production logic would have left the test green. Fix: extract the pure wire-format decision into src/node/handler/NewChangesPacker.ts (no DB / pad dependency, so the test can import it directly under vitest), and rewrite the test to assert against the exported `buildNewChangesEmits` function from that module. PadMessageHandler now calls the same function; deleting it would fail the test. 9/9 tests across new-changes-batch + prom-instruments. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/node/handler/NewChangesPacker.ts | 41 +++++++++ src/node/handler/PadMessageHandler.ts | 83 ++++++++++--------- .../specs/new-changes-batch.test.ts | 63 +++++--------- 3 files changed, 103 insertions(+), 84 deletions(-) create mode 100644 src/node/handler/NewChangesPacker.ts diff --git a/src/node/handler/NewChangesPacker.ts b/src/node/handler/NewChangesPacker.ts new file mode 100644 index 00000000000..e952c343930 --- /dev/null +++ b/src/node/handler/NewChangesPacker.ts @@ -0,0 +1,41 @@ +// Wire-format decision for NEW_CHANGES vs NEW_CHANGES_BATCH (#7756 lever 3b). +// +// Lives in its own tiny module rather than inside PadMessageHandler so the +// pure decision can be unit-tested without standing up the full pad / DB / +// socket.io stack. PadMessageHandler.updatePadClients calls this function +// once per recipient with the queued revisions for that recipient. + +export type NewChangesItem = { + newRev: number; + changeset: string; + apool: unknown; + author: string; + currentTime: number; + timeDelta: number; +}; + +export type NewChangesEmit = + | {type: 'COLLABROOM'; data: {type: 'NEW_CHANGES'} & NewChangesItem} + | {type: 'COLLABROOM'; data: {type: 'NEW_CHANGES_BATCH'; changes: NewChangesItem[]}}; + +/** + * Decide what to put on the wire for one recipient. + * - No queued revisions: nothing. + * - Batching disabled, or exactly one rev: emit one NEW_CHANGES per rev + * (legacy behaviour; preserves bytes-on-wire for the steady state). + * - Batching enabled and multiple revs: emit one NEW_CHANGES_BATCH with + * the array of revisions. + */ +export const buildNewChangesEmits = ( + pending: NewChangesItem[], + batchEnabled: boolean, +): NewChangesEmit[] => { + if (pending.length === 0) return []; + if (batchEnabled && pending.length > 1) { + return [{type: 'COLLABROOM', data: {type: 'NEW_CHANGES_BATCH', changes: pending}}]; + } + return pending.map((change) => ({ + type: 'COLLABROOM', + data: {type: 'NEW_CHANGES', ...change}, + } as NewChangesEmit)); +}; diff --git a/src/node/handler/PadMessageHandler.ts b/src/node/handler/PadMessageHandler.ts index 60b0d853702..43a000aa0d8 100644 --- a/src/node/handler/PadMessageHandler.ts +++ b/src/node/handler/PadMessageHandler.ts @@ -48,6 +48,7 @@ const hooks = require('../../static/js/pluginfw/hooks'); const stats = require('../stats') const assert = require('assert').strict; import {recordChangesetApply, recordSocketEmit} from '../prom-instruments'; +import {buildNewChangesEmits, type NewChangesItem} from './NewChangesPacker'; import {RateLimiterMemory} from 'rate-limiter-flexible'; import {ChangesetRequest, PadUserInfo, SocketClientRequest} from "../types/SocketClientRequest"; import {APool, AText, PadAuthor, PadType} from "../types/PadType"; @@ -977,6 +978,19 @@ exports.updatePadClients = async (pad: PadType) => { // The user might have disconnected since _getRoomSockets() was called. if (sessioninfo == null) return; + // Snapshot the local state so a concurrent updatePadClients() can't make + // us double-emit. We hold the "I'm responsible for revs (startRev, + // headRev]" claim by reading sessioninfo.rev once and overwriting it + // before any await. A second invocation arriving mid-loop will see the + // bumped rev and skip those revisions; if our emit fails the catch + // below rolls sessioninfo.rev back so they aren't lost. + const startRev = sessioninfo.rev; + const headRev = pad.getHeadRevisionNumber(); + if (startRev >= headRev) return; + const startTime = sessioninfo.time; + // Claim the range immediately so concurrent runs skip it. + sessioninfo.rev = headRev; + // Collect all queued revisions for this socket. const pending: Array<{ newRev: number; @@ -987,50 +1001,39 @@ exports.updatePadClients = async (pad: PadType) => { timeDelta: number; }> = []; - while (sessioninfo.rev < pad.getHeadRevisionNumber()) { - const r = sessioninfo.rev + 1; - let revision = revCache[r]; - if (!revision) { - revision = await pad.getRevision(r); - revCache[r] = revision; - } - - const author = revision.meta.author; - const revChangeset = revision.changeset; - const currentTime = revision.meta.timestamp; - const forWire = prepareForWire(revChangeset, pad.pool); - - pending.push({ - newRev: r, - changeset: forWire.translated, - apool: forWire.pool, - author, - currentTime, - timeDelta: currentTime - sessioninfo.time, - }); - sessioninfo.time = currentTime; - sessioninfo.rev = r; - } - - if (pending.length === 0) return; - try { - if (batchEnabled && pending.length > 1) { - socket.emit('message', { - type: 'COLLABROOM', - data: {type: 'NEW_CHANGES_BATCH', changes: pending}, - }); - recordSocketEmit('NEW_CHANGES_BATCH'); - } else { - for (const change of pending) { - socket.emit('message', { - type: 'COLLABROOM', - data: {type: 'NEW_CHANGES', ...change}, - }); - recordSocketEmit('NEW_CHANGES'); + let previousTime = startTime; + for (let r = startRev + 1; r <= headRev; r++) { + let revision = revCache[r]; + if (!revision) { + revision = await pad.getRevision(r); + revCache[r] = revision; } + const author = revision.meta.author; + const revChangeset = revision.changeset; + const currentTime = revision.meta.timestamp; + const forWire = prepareForWire(revChangeset, pad.pool); + pending.push({ + newRev: r, + changeset: forWire.translated, + apool: forWire.pool, + author, + currentTime, + timeDelta: currentTime - previousTime, + }); + previousTime = currentTime; + } + + for (const emit of buildNewChangesEmits(pending, batchEnabled)) { + socket.emit('message', emit); + recordSocketEmit(emit.data.type); } + // Only after the wire send succeeds do we commit the new time. + sessioninfo.time = previousTime; } catch (err: any) { + // Roll back the claim so the next updatePadClients retries these revs. + // Only set rev back if no one else has advanced past us in the meantime. + if (sessioninfo.rev === headRev) sessioninfo.rev = startRev; messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`); } })); diff --git a/src/tests/backend-new/specs/new-changes-batch.test.ts b/src/tests/backend-new/specs/new-changes-batch.test.ts index 6c22099e528..b9fac90a0ff 100644 --- a/src/tests/backend-new/specs/new-changes-batch.test.ts +++ b/src/tests/backend-new/specs/new-changes-batch.test.ts @@ -1,73 +1,48 @@ -// Unit coverage for the NEW_CHANGES_BATCH server-side packing -// (#7756 lever 3b). Server-side concern only — verifies that the -// pad fan-out emits one batch per recipient when multiple revs queue -// up and the feature flag is on, and falls back to per-rev emits -// otherwise. Client-side coverage lives in the existing Playwright -// flow tests; this test pins the wire-format decision. +// Regression test for the NEW_CHANGES_BATCH wire-format decision +// (#7756 lever 3b). Imports the real implementation from +// PadMessageHandler so removing or breaking the production batching +// logic fails this test. import {describe, it, expect, beforeEach, afterEach} from 'vitest'; import settings from '../../../node/utils/Settings'; +import {buildNewChangesEmits, type NewChangesItem} from '../../../node/handler/NewChangesPacker'; const ORIGINAL_FLAG = settings.newChangesBatch; beforeEach(() => { settings.newChangesBatch = false; }); afterEach(() => { settings.newChangesBatch = ORIGINAL_FLAG; }); -// The decision the new code makes is small and pure: given a `pending` -// array of N >= 1 revisions and the feature flag, emit one -// NEW_CHANGES_BATCH (if N > 1 and flag on) or N NEW_CHANGES messages. -// Re-implement the decision here so the test doesn't have to stand up -// the full pad/DB stack — and pin it against the actual implementation -// via a comment in PadMessageHandler. - -type Pending = {newRev: number; changeset: string; apool: unknown; - author: string; currentTime: number; timeDelta: number}; -type Emit = {type: 'COLLABROOM'; data: any}; - -const decideEmits = (pending: Pending[], batchEnabled: boolean): Emit[] => { - if (pending.length === 0) return []; - if (batchEnabled && pending.length > 1) { - return [{type: 'COLLABROOM', data: {type: 'NEW_CHANGES_BATCH', changes: pending}}]; - } - return pending.map((change) => ({ - type: 'COLLABROOM', - data: {type: 'NEW_CHANGES', ...change}, - })); -}; - -const fakePending = (n: number): Pending[] => +const fakePending = (n: number): NewChangesItem[] => Array.from({length: n}, (_, i) => ({ newRev: i + 1, changeset: `=${i}`, apool: {}, author: 'a.1', currentTime: 1_000 * (i + 1), timeDelta: 1_000, })); -describe('NEW_CHANGES_BATCH emit decision', () => { - it('with flag OFF, sends one NEW_CHANGES per rev regardless of count', () => { - settings.newChangesBatch = false; - const emits = decideEmits(fakePending(5), settings.newChangesBatch); +describe('buildNewChangesEmits', () => { + it('flag OFF: one NEW_CHANGES per rev regardless of count', () => { + const emits = buildNewChangesEmits(fakePending(5), false); expect(emits).toHaveLength(5); expect(emits.every((e) => e.data.type === 'NEW_CHANGES')).toBe(true); }); - it('with flag ON and one queued rev, still sends NEW_CHANGES (no batch overhead)', () => { - settings.newChangesBatch = true; - const emits = decideEmits(fakePending(1), settings.newChangesBatch); + it('flag ON, single rev: still NEW_CHANGES (no batch overhead for the steady state)', () => { + const emits = buildNewChangesEmits(fakePending(1), true); expect(emits).toHaveLength(1); expect(emits[0]!.data.type).toBe('NEW_CHANGES'); }); - it('with flag ON and multiple queued revs, sends one NEW_CHANGES_BATCH', () => { - settings.newChangesBatch = true; - const emits = decideEmits(fakePending(5), settings.newChangesBatch); + it('flag ON, multiple revs: a single NEW_CHANGES_BATCH carrying all of them', () => { + const emits = buildNewChangesEmits(fakePending(5), true); expect(emits).toHaveLength(1); expect(emits[0]!.data.type).toBe('NEW_CHANGES_BATCH'); - expect(emits[0]!.data.changes).toHaveLength(5); - expect(emits[0]!.data.changes[0]!.newRev).toBe(1); - expect(emits[0]!.data.changes[4]!.newRev).toBe(5); + const batch = emits[0]!.data as {type: 'NEW_CHANGES_BATCH'; changes: NewChangesItem[]}; + expect(batch.changes).toHaveLength(5); + expect(batch.changes[0]!.newRev).toBe(1); + expect(batch.changes[4]!.newRev).toBe(5); }); it('empty pending list emits nothing', () => { - settings.newChangesBatch = true; - expect(decideEmits([], settings.newChangesBatch)).toEqual([]); + expect(buildNewChangesEmits([], true)).toEqual([]); + expect(buildNewChangesEmits([], false)).toEqual([]); }); });