Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions settings.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,18 @@
*/
"loadTest": false,

/*
* Pack multiple NEW_CHANGES revisions into a single NEW_CHANGES_BATCH emit
* per recipient when a fan-out has more than one revision to catch up
* (#7756 lever 3b). Reduces engine.io packet count under high pad
* concurrency, especially on the WebSocket transport.
*
* WARNING: enabling this requires all connected clients to understand
* NEW_CHANGES_BATCH. Old clients will silently fail to apply batched
* revisions. Coordinate the rollout.
*/
"newChangesBatch": false,

/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
41 changes: 41 additions & 0 deletions src/node/handler/NewChangesPacker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Wire-format decision for NEW_CHANGES vs NEW_CHANGES_BATCH (#7756 lever 3b).
//
// Lives in its own tiny module rather than inside PadMessageHandler so the
// pure decision can be unit-tested without standing up the full pad / DB /
// socket.io stack. PadMessageHandler.updatePadClients calls this function
// once per recipient with the queued revisions for that recipient.

export type NewChangesItem = {
newRev: number;
changeset: string;
apool: unknown;
author: string;
currentTime: number;
timeDelta: number;
};

export type NewChangesEmit =
| {type: 'COLLABROOM'; data: {type: 'NEW_CHANGES'} & NewChangesItem}
| {type: 'COLLABROOM'; data: {type: 'NEW_CHANGES_BATCH'; changes: NewChangesItem[]}};

/**
* Decide what to put on the wire for one recipient.
* - No queued revisions: nothing.
* - Batching disabled, or exactly one rev: emit one NEW_CHANGES per rev
* (legacy behaviour; preserves bytes-on-wire for the steady state).
* - Batching enabled and multiple revs: emit one NEW_CHANGES_BATCH with
* the array of revisions.
*/
export const buildNewChangesEmits = (
pending: NewChangesItem[],
batchEnabled: boolean,
): NewChangesEmit[] => {
if (pending.length === 0) return [];
if (batchEnabled && pending.length > 1) {
return [{type: 'COLLABROOM', data: {type: 'NEW_CHANGES_BATCH', changes: pending}}];
}
return pending.map((change) => ({
type: 'COLLABROOM',
data: {type: 'NEW_CHANGES', ...change},
} as NewChangesEmit));
};
86 changes: 59 additions & 27 deletions src/node/handler/PadMessageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const hooks = require('../../static/js/pluginfw/hooks');
const stats = require('../stats')
const assert = require('assert').strict;
import {recordChangesetApply, recordSocketEmit} from '../prom-instruments';
import {buildNewChangesEmits, type NewChangesItem} from './NewChangesPacker';
import {RateLimiterMemory} from 'rate-limiter-flexible';
import {ChangesetRequest, PadUserInfo, SocketClientRequest} from "../types/SocketClientRequest";
import {APool, AText, PadAuthor, PadType} from "../types/PadType";
Expand Down Expand Up @@ -964,45 +965,76 @@ exports.updatePadClients = async (pad: PadType) => {
// but benefit of reusing cached revision object is HUGE
const revCache:MapArrayType<any> = {};

// When `settings.newChangesBatch` is true and a recipient is more than one
// revision behind, pack the queued revisions into a single NEW_CHANGES_BATCH
// emit per recipient. The engine.io WebSocket transport sends one frame per
// packet (the polling transport already batches at the HTTP-response layer),
// so reducing the packet count translates directly into fewer system calls
// on the server and fewer onmessage callbacks on the client.
const batchEnabled = settings.newChangesBatch === true;

await Promise.all(roomSockets.map(async (socket) => {
const sessioninfo = sessioninfos[socket.id];
// The user might have disconnected since _getRoomSockets() was called.
if (sessioninfo == null) return;

while (sessioninfo.rev < pad.getHeadRevisionNumber()) {
const r = sessioninfo.rev + 1;
let revision = revCache[r];
if (!revision) {
revision = await pad.getRevision(r);
revCache[r] = revision;
}

const author = revision.meta.author;
const revChangeset = revision.changeset;
const currentTime = revision.meta.timestamp;
// Snapshot the local state so a concurrent updatePadClients() can't make
// us double-emit. We hold the "I'm responsible for revs (startRev,
// headRev]" claim by reading sessioninfo.rev once and overwriting it
// before any await. A second invocation arriving mid-loop will see the
// bumped rev and skip those revisions; if our emit fails the catch
// below rolls sessioninfo.rev back so they aren't lost.
const startRev = sessioninfo.rev;
const headRev = pad.getHeadRevisionNumber();
if (startRev >= headRev) return;
const startTime = sessioninfo.time;
// Claim the range immediately so concurrent runs skip it.
sessioninfo.rev = headRev;

// Collect all queued revisions for this socket.
const pending: Array<{
newRev: number;
changeset: string;
apool: unknown;
author: string;
currentTime: number;
timeDelta: number;
}> = [];

const forWire = prepareForWire(revChangeset, pad.pool);
const msg = {
type: 'COLLABROOM',
data: {
type: 'NEW_CHANGES',
try {
let previousTime = startTime;
for (let r = startRev + 1; r <= headRev; r++) {
let revision = revCache[r];
if (!revision) {
revision = await pad.getRevision(r);
revCache[r] = revision;
}
const author = revision.meta.author;
const revChangeset = revision.changeset;
const currentTime = revision.meta.timestamp;
const forWire = prepareForWire(revChangeset, pad.pool);
pending.push({
newRev: r,
changeset: forWire.translated,
apool: forWire.pool,
author,
currentTime,
timeDelta: currentTime - sessioninfo.time,
},
};
try {
socket.emit('message', msg);
recordSocketEmit('NEW_CHANGES');
} catch (err:any) {
messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`);
return;
timeDelta: currentTime - previousTime,
});
previousTime = currentTime;
}

for (const emit of buildNewChangesEmits(pending, batchEnabled)) {
socket.emit('message', emit);
recordSocketEmit(emit.data.type);
}
sessioninfo.time = currentTime;
sessioninfo.rev = r;
// Only after the wire send succeeds do we commit the new time.
sessioninfo.time = previousTime;
} catch (err: any) {
// Roll back the claim so the next updatePadClients retries these revs.
// Only set rev back if no one else has advanced past us in the meantime.
if (sessioninfo.rev === headRev) sessioninfo.rev = startRev;
messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`);
}
}));
};
Expand Down
1 change: 1 addition & 0 deletions src/node/prom-instruments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export const padUsersGauge = new client.Gauge({
// state.
const KNOWN_TYPES = new Set([
'NEW_CHANGES',
'NEW_CHANGES_BATCH',
'ACCEPT_COMMIT',
'CHAT_MESSAGE',
'CLIENT_VARS',
Expand Down
16 changes: 16 additions & 0 deletions src/node/utils/Settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ export type SettingsType = {
automaticReconnectionTimeout: number,
loadTest: boolean,
scalingDiveMetrics: boolean,
newChangesBatch: boolean,
dumpOnUncleanExit: boolean,
indentationOnNewLine: boolean,
logconfig: any | null,
Expand Down Expand Up @@ -658,6 +659,21 @@ const settings: SettingsType = {
* production deployments aren't paying for instrumentation they don't use.
*/
scalingDiveMetrics: false,
/**
* Pack multiple NEW_CHANGES revisions into a single NEW_CHANGES_BATCH emit
* per recipient when a fan-out has more than one revision to catch up
* (#7756 lever 3b). Reduces engine.io packet count under high pad
* concurrency, especially on the WebSocket transport, which sends one
* frame per packet (the polling transport already batches naturally).
*
* Requires clients to recognise the NEW_CHANGES_BATCH message type. New
* clients are forward-compatible (they handle both NEW_CHANGES and
* NEW_CHANGES_BATCH). Old clients connecting to a server with this enabled
* would fail to apply batched revisions, so coordinate the rollout.
*
* 0/false (default) preserves legacy per-revision emit behaviour.
*/
newChangesBatch: false,
/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
28 changes: 17 additions & 11 deletions src/static/js/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,17 +493,23 @@ const loadBroadcastJS = (socket, sendSocketMsg, fireWhenAllScriptsAreLoaded, Bro
if (obj.type === 'COLLABROOM') {
obj = obj.data;

if (obj.type === 'NEW_CHANGES') {
const changeset = moveOpsToNewPool(
obj.changeset, (new AttribPool()).fromJsonable(obj.apool), padContents.apool);

let changesetBack = inverse(
obj.changeset, padContents.currentLines, padContents.alines, padContents.apool);

changesetBack = moveOpsToNewPool(
changesetBack, (new AttribPool()).fromJsonable(obj.apool), padContents.apool);

loadedNewChangeset(changeset, changesetBack, obj.newRev - 1, obj.timeDelta);
if (obj.type === 'NEW_CHANGES' || obj.type === 'NEW_CHANGES_BATCH') {
// NEW_CHANGES_BATCH (#7756 lever 3b) carries an array of revisions
// in one emit. Each revision has the same shape as the legacy
// single-rev message; apply in order.
const changes = obj.type === 'NEW_CHANGES_BATCH' ? obj.changes : [obj];
for (const change of changes) {
const changeset = moveOpsToNewPool(
change.changeset, (new AttribPool()).fromJsonable(change.apool), padContents.apool);

let changesetBack = inverse(
change.changeset, padContents.currentLines, padContents.alines, padContents.apool);

changesetBack = moveOpsToNewPool(
changesetBack, (new AttribPool()).fromJsonable(change.apool), padContents.apool);

loadedNewChangeset(changeset, changesetBack, change.newRev - 1, change.timeDelta);
}
} else if (obj.type === 'NEW_AUTHORDATA') {
const authorMap = {};
authorMap[obj.author] = obj.data;
Expand Down
23 changes: 15 additions & 8 deletions src/static/js/collab_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
if (wrapper.type !== 'COLLABROOM' && wrapper.type !== 'CUSTOM') return;
const msg = wrapper.data;

if (msg.type === 'NEW_CHANGES') {
if (msg.type === 'NEW_CHANGES' || msg.type === 'NEW_CHANGES_BATCH') {
// NEW_CHANGES_BATCH (added in #7756 lever 3b) carries an array of
// revisions in one emit. Each revision has the same shape as the
// legacy single-rev message, so we normalise to a list and apply in
// order, sharing the same compose-safety await.
const changes = msg.type === 'NEW_CHANGES_BATCH' ? msg.changes : [msg];
serverMessageTaskQueue.enqueue(async () => {
// Avoid updating the DOM while the user is composing a character. Notes about this `await`:
// * `await null;` is equivalent to `await Promise.resolve(null);`, so if the user is not
Expand All @@ -198,14 +203,16 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
// possible, that the chances are so small or the consequences so minor that it's not
// worth addressing).
await editor.getInInternationalComposition();
const {newRev, changeset, author = '', apool} = msg;
if (newRev !== (rev + 1)) {
window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${rev + 1}`);
// setChannelState("DISCONNECTED", "badmessage_newchanges");
return;
for (const change of changes) {
const {newRev, changeset, author = '', apool} = change;
if (newRev !== (rev + 1)) {
window.console.warn(`bad message revision on ${msg.type}: ${newRev} not ${rev + 1}`);
// setChannelState("DISCONNECTED", "badmessage_newchanges");
return;
}
rev = newRev;
editor.applyChangesToBase(changeset, author, apool);
}
rev = newRev;
editor.applyChangesToBase(changeset, author, apool);
});
} else if (msg.type === 'ACCEPT_COMMIT') {
serverMessageTaskQueue.enqueue(() => {
Expand Down
14 changes: 14 additions & 0 deletions src/static/js/types/SocketIOMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ export type ClientNewChanges = {
payload?: ClientNewChanges
}

export type NewChangesItem = {
apool: AttributePool,
author: string,
changeset: string,
currentTime: number,
newRev: number,
timeDelta: number,
}

export type ClientNewChangesBatch = {
type: 'NEW_CHANGES_BATCH',
changes: NewChangesItem[],
}

export type ClientAcceptCommitMessage = {
type: 'ACCEPT_COMMIT'
newRev: number
Expand Down
48 changes: 48 additions & 0 deletions src/tests/backend-new/specs/new-changes-batch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Regression test for the NEW_CHANGES_BATCH wire-format decision
// (#7756 lever 3b). Imports the real implementation from
// PadMessageHandler so removing or breaking the production batching
// logic fails this test.

import {describe, it, expect, beforeEach, afterEach} from 'vitest';
import settings from '../../../node/utils/Settings';
import {buildNewChangesEmits, type NewChangesItem} from '../../../node/handler/NewChangesPacker';

const ORIGINAL_FLAG = settings.newChangesBatch;

beforeEach(() => { settings.newChangesBatch = false; });
afterEach(() => { settings.newChangesBatch = ORIGINAL_FLAG; });

const fakePending = (n: number): NewChangesItem[] =>
Array.from({length: n}, (_, i) => ({
newRev: i + 1, changeset: `=${i}`, apool: {}, author: 'a.1',
currentTime: 1_000 * (i + 1), timeDelta: 1_000,
}));

describe('buildNewChangesEmits', () => {
it('flag OFF: one NEW_CHANGES per rev regardless of count', () => {
const emits = buildNewChangesEmits(fakePending(5), false);
expect(emits).toHaveLength(5);
expect(emits.every((e) => e.data.type === 'NEW_CHANGES')).toBe(true);
});

it('flag ON, single rev: still NEW_CHANGES (no batch overhead for the steady state)', () => {
const emits = buildNewChangesEmits(fakePending(1), true);
expect(emits).toHaveLength(1);
expect(emits[0]!.data.type).toBe('NEW_CHANGES');
});

it('flag ON, multiple revs: a single NEW_CHANGES_BATCH carrying all of them', () => {
const emits = buildNewChangesEmits(fakePending(5), true);
expect(emits).toHaveLength(1);
expect(emits[0]!.data.type).toBe('NEW_CHANGES_BATCH');
const batch = emits[0]!.data as {type: 'NEW_CHANGES_BATCH'; changes: NewChangesItem[]};
expect(batch.changes).toHaveLength(5);
expect(batch.changes[0]!.newRev).toBe(1);
expect(batch.changes[4]!.newRev).toBe(5);
});

it('empty pending list emits nothing', () => {
expect(buildNewChangesEmits([], true)).toEqual([]);
expect(buildNewChangesEmits([], false)).toEqual([]);
});
});
Loading