diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index ebcd849984..33f8da2829 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -19,8 +19,8 @@ import { MessageChannelEvent, MessageChannelEvents, type MessageChannelOptions, + type ParticipantId, Message as SdsMessage, - type SenderId, SyncMessage } from "@waku/sds"; import { Logger } from "@waku/utils"; @@ -39,9 +39,11 @@ import { ISyncStatusEvents, SyncStatus } from "./sync_status.js"; const log = new Logger("sdk:reliable-channel"); const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds +const SYNC_INTERVAL_REPAIR_MULTIPLIER = 0.3; // Reduce sync interval when repairs pending const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds const DEFAULT_MAX_RETRY_ATTEMPTS = 10; const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000; +const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000; const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [ @@ -51,6 +53,15 @@ const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [ LightPushError.RLN_PROOF_GENERATION ]; +/** + * Strategy for retrieving missing messages. + * - 'both': Use SDS-R peer repair and Store queries in parallel (default) + * - 'sds-r-only': Only use SDS-R peer repair + * - 'store-only': Only use Store queries (legacy behavior) + * - 'none': No automatic retrieval + */ +export type RetrievalStrategy = "both" | "sds-r-only" | "store-only" | "none"; + export type ReliableChannelOptions = MessageChannelOptions & { /** * The minimum interval between 2 sync messages in the channel. @@ -81,6 +92,7 @@ export type ReliableChannelOptions = MessageChannelOptions & { /** * How often store queries are done to retrieve missing messages. + * Only applies when retrievalStrategy includes Store ('both' or 'store-only'). * * @default 10,000 (10 seconds) */ @@ -114,6 +126,13 @@ export type ReliableChannelOptions = MessageChannelOptions & { * @default 1000 (1 second) */ processTaskMinElapseMs?: number; + + /** + * Strategy for retrieving missing messages. + * + * @default 'both' + */ + retrievalStrategy?: RetrievalStrategy; }; /** @@ -152,6 +171,7 @@ export class ReliableChannel< private syncRandomTimeout: RandomTimeout; private sweepInBufInterval: ReturnType | undefined; private readonly sweepInBufIntervalMs: number; + private sweepRepairInterval: ReturnType | undefined; private processTaskTimeout: ReturnType | undefined; private readonly retryManager: RetryManager | undefined; private readonly missingMessageRetriever?: MissingMessageRetriever; @@ -165,6 +185,7 @@ export class ReliableChannel< public messageChannel: MessageChannel, private encoder: IEncoder, private decoder: IDecoder, + private retrievalStrategy: RetrievalStrategy, options?: ReliableChannelOptions ) { super(); @@ -226,7 +247,8 @@ export class ReliableChannel< this.processTaskMinElapseMs = options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS; - if (this._retrieve) { + // Only enable Store retrieval based on strategy + if (this._retrieve && this.shouldUseStore()) { this.missingMessageRetriever = new MissingMessageRetriever( this.decoder, options?.retrieveFrequencyMs, @@ -290,17 +312,26 @@ export class ReliableChannel< public static async create( node: IWaku, channelId: ChannelId, - senderId: SenderId, + senderId: ParticipantId, encoder: IEncoder, decoder: IDecoder, options?: ReliableChannelOptions ): Promise> { - const sdsMessageChannel = new MessageChannel(channelId, senderId, options); + // Enable SDS-R repair only if retrieval strategy uses it + const retrievalStrategy = options?.retrievalStrategy ?? "both"; + const enableRepair = + retrievalStrategy === "both" || retrievalStrategy === "sds-r-only"; + + const sdsMessageChannel = new MessageChannel(channelId, senderId, { + ...options, + enableRepair + }); const messageChannel = new ReliableChannel( node, sdsMessageChannel, encoder, decoder, + retrievalStrategy, options ); @@ -455,6 +486,7 @@ export class ReliableChannel< // missing messages or the status of previous outgoing messages this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint); + // Remove from Store retriever if message was retrieved this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId); if (sdsMessage.content && sdsMessage.content.length > 0) { @@ -528,6 +560,9 @@ export class ReliableChannel< this.setupEventListeners(); this.restartSync(); this.startSweepIncomingBufferLoop(); + + this.startRepairSweepLoop(); + if (this._retrieve) { this.missingMessageRetriever?.start(); this.queryOnConnect?.start(); @@ -544,6 +579,7 @@ export class ReliableChannel< this.removeAllEventListeners(); this.stopSync(); this.stopSweepIncomingBufferLoop(); + this.stopRepairSweepLoop(); this.clearProcessTasks(); if (this.activePendingProcessTask) { @@ -582,8 +618,55 @@ export class ReliableChannel< } } + private startRepairSweepLoop(): void { + if (!this.shouldUseSdsR()) { + return; + } + this.stopRepairSweepLoop(); + this.sweepRepairInterval = setInterval(() => { + void this.messageChannel + .sweepRepairIncomingBuffer(async (message) => { + // Rebroadcast the repair message + const wakuMessage = { payload: message.encode() }; + const result = await this._send(this.encoder, wakuMessage); + return result.failures.length === 0; + }) + .catch((err) => { + log.error("error encountered when sweeping repair buffer", err); + }); + }, DEFAULT_SWEEP_REPAIR_INTERVAL_MS); + } + + private stopRepairSweepLoop(): void { + if (this.sweepRepairInterval) { + clearInterval(this.sweepRepairInterval); + this.sweepInBufInterval = undefined; + } + } + + private shouldUseStore(): boolean { + return ( + this.retrievalStrategy === "both" || + this.retrievalStrategy === "store-only" + ); + } + + private shouldUseSdsR(): boolean { + return ( + this.retrievalStrategy === "both" || + this.retrievalStrategy === "sds-r-only" + ); + } + private restartSync(multiplier: number = 1): void { - this.syncRandomTimeout.restart(multiplier); + // Adaptive sync: use shorter interval when repairs are pending + const hasPendingRepairs = + this.shouldUseSdsR() && this.messageChannel.hasPendingRepairRequests(); + const effectiveMultiplier = hasPendingRepairs + ? multiplier * SYNC_INTERVAL_REPAIR_MULTIPLIER + : multiplier; + + this.syncRandomTimeout.restart(effectiveMultiplier); } private stopSync(): void { @@ -731,6 +814,8 @@ export class ReliableChannel< ); for (const { messageId, retrievalHint } of event.detail) { + // Store retrieval (for 'both' and 'store-only' strategies) + // SDS-R repair happens automatically via RepairManager for 'both' and 'sds-r-only' if (retrievalHint && this.missingMessageRetriever) { this.missingMessageRetriever.addMissingMessage( messageId, diff --git a/packages/sds/src/message_channel/events.ts b/packages/sds/src/message_channel/events.ts index ecc2a55edc..aa088d720b 100644 --- a/packages/sds/src/message_channel/events.ts +++ b/packages/sds/src/message_channel/events.ts @@ -12,10 +12,8 @@ export enum MessageChannelEvent { InMessageLost = "sds:in:message-irretrievably-lost", ErrorTask = "sds:error-task", // SDS-R Repair Events - RepairRequestQueued = "sds:repair:request-queued", RepairRequestSent = "sds:repair:request-sent", RepairRequestReceived = "sds:repair:request-received", - RepairResponseQueued = "sds:repair:response-queued", RepairResponseSent = "sds:repair:response-sent" } @@ -33,10 +31,6 @@ export type MessageChannelEvents = { [MessageChannelEvent.OutSyncSent]: CustomEvent; [MessageChannelEvent.InSyncReceived]: CustomEvent; [MessageChannelEvent.ErrorTask]: CustomEvent; - [MessageChannelEvent.RepairRequestQueued]: CustomEvent<{ - messageId: MessageId; - tReq: number; - }>; [MessageChannelEvent.RepairRequestSent]: CustomEvent<{ messageIds: MessageId[]; carrierMessageId: MessageId; @@ -45,10 +39,6 @@ export type MessageChannelEvents = { messageIds: MessageId[]; fromSenderId?: ParticipantId; }>; - [MessageChannelEvent.RepairResponseQueued]: CustomEvent<{ - messageId: MessageId; - tResp: number; - }>; [MessageChannelEvent.RepairResponseSent]: CustomEvent<{ messageId: MessageId; }>; diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 36919bbce2..eccc1435b9 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -128,13 +128,7 @@ export class MessageChannel extends TypedEventEmitter { // Only construct RepairManager if repair is enabled (default: true) if (options.enableRepair ?? true) { - this.repairManager = new RepairManager( - senderId, - options.repairConfig, - (event: string, detail: unknown) => { - this.safeSendEvent(event as MessageChannelEvent, { detail }); - } - ); + this.repairManager = new RepairManager(senderId, options.repairConfig); } } @@ -142,6 +136,14 @@ export class MessageChannel extends TypedEventEmitter { return bytesToHex(sha256(payload)); } + /** + * Check if there are pending repair requests that need to be sent. + * Useful for adaptive sync intervals - increase frequency when repairs pending. + */ + public hasPendingRepairRequests(currentTime = Date.now()): boolean { + return this.repairManager?.hasRequestsReady(currentTime) ?? false; + } + /** * Processes all queued tasks sequentially to ensure proper message ordering. * diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 4207483165..1a8a85f43e 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -20,11 +20,6 @@ const log = new Logger("sds:repair:manager"); */ const PARTICIPANTS_PER_RESPONSE_GROUP = 128; -/** - * Event emitter callback for repair events - */ -export type RepairEventEmitter = (event: string, detail: unknown) => void; - /** * Configuration for SDS-R repair protocol */ @@ -58,16 +53,10 @@ export class RepairManager { private readonly config: Required; private readonly outgoingBuffer: OutgoingRepairBuffer; private readonly incomingBuffer: IncomingRepairBuffer; - private readonly eventEmitter?: RepairEventEmitter; - public constructor( - participantId: ParticipantId, - config: RepairConfig = {}, - eventEmitter?: RepairEventEmitter - ) { + public constructor(participantId: ParticipantId, config: RepairConfig = {}) { this.participantId = participantId; this.config = { ...DEFAULT_REPAIR_CONFIG, ...config }; - this.eventEmitter = eventEmitter; this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize); this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize); @@ -142,19 +131,13 @@ export class RepairManager { // Calculate when to request this repair const tReq = this.calculateTReq(entry.messageId, currentTime); - // Add to outgoing buffer - only log and emit event if actually added + // Add to outgoing buffer - only log if actually added const wasAdded = this.outgoingBuffer.add(entry, tReq); if (wasAdded) { log.info( `Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}` ); - - // Emit event - this.eventEmitter?.("RepairRequestQueued", { - messageId: entry.messageId, - tReq - }); } } } @@ -238,19 +221,13 @@ export class RepairManager { currentTime ); - // Add to incoming buffer - only log and emit event if actually added + // Add to incoming buffer - only log if actually added const wasAdded = this.incomingBuffer.add(request, tResp); if (wasAdded) { log.info( `Will respond to repair request for ${request.messageId} at T_resp=${tResp}` ); - - // Emit event - this.eventEmitter?.("RepairResponseQueued", { - messageId: request.messageId, - tResp - }); } } } @@ -328,4 +305,12 @@ export class RepairManager { `Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants` ); } + + /** + * Check if there are repair requests ready to be sent + */ + public hasRequestsReady(currentTime = Date.now()): boolean { + const items = this.outgoingBuffer.getItems(); + return items.length > 0 && items[0].tReq <= currentTime; + } }