Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 90 additions & 5 deletions packages/sdk/src/reliable_channel/reliable_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import {
MessageChannelEvent,
MessageChannelEvents,
type MessageChannelOptions,
type ParticipantId,
Message as SdsMessage,
type SenderId,
SyncMessage
} from "@waku/sds";
import { Logger } from "@waku/utils";
Expand All @@ -39,9 +39,11 @@ import { ISyncStatusEvents, SyncStatus } from "./sync_status.js";
const log = new Logger("sdk:reliable-channel");

const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds
const SYNC_INTERVAL_REPAIR_MULTIPLIER = 0.3; // Reduce sync interval when repairs pending
const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds
const DEFAULT_MAX_RETRY_ATTEMPTS = 10;
const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000;
const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds
const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000;

const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
Expand All @@ -51,6 +53,15 @@ const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
LightPushError.RLN_PROOF_GENERATION
];

/**
* Strategy for retrieving missing messages.
* - 'both': Use SDS-R peer repair and Store queries in parallel (default)
* - 'sds-r-only': Only use SDS-R peer repair
* - 'store-only': Only use Store queries (legacy behavior)
* - 'none': No automatic retrieval
*/
export type RetrievalStrategy = "both" | "sds-r-only" | "store-only" | "none";

export type ReliableChannelOptions = MessageChannelOptions & {
/**
* The minimum interval between 2 sync messages in the channel.
Expand Down Expand Up @@ -81,6 +92,7 @@ export type ReliableChannelOptions = MessageChannelOptions & {

/**
* How often store queries are done to retrieve missing messages.
* Only applies when retrievalStrategy includes Store ('both' or 'store-only').
*
* @default 10,000 (10 seconds)
*/
Expand Down Expand Up @@ -114,6 +126,13 @@ export type ReliableChannelOptions = MessageChannelOptions & {
* @default 1000 (1 second)
*/
processTaskMinElapseMs?: number;

/**
* Strategy for retrieving missing messages.
*
* @default 'both'
*/
retrievalStrategy?: RetrievalStrategy;
};

/**
Expand Down Expand Up @@ -152,6 +171,7 @@ export class ReliableChannel<
private syncRandomTimeout: RandomTimeout;
private sweepInBufInterval: ReturnType<typeof setInterval> | undefined;
private readonly sweepInBufIntervalMs: number;
private sweepRepairInterval: ReturnType<typeof setInterval> | undefined;
private processTaskTimeout: ReturnType<typeof setTimeout> | undefined;
private readonly retryManager: RetryManager | undefined;
private readonly missingMessageRetriever?: MissingMessageRetriever<T>;
Expand All @@ -165,6 +185,7 @@ export class ReliableChannel<
public messageChannel: MessageChannel,
private encoder: IEncoder,
private decoder: IDecoder<T>,
private retrievalStrategy: RetrievalStrategy,
options?: ReliableChannelOptions
) {
super();
Expand Down Expand Up @@ -226,7 +247,8 @@ export class ReliableChannel<
this.processTaskMinElapseMs =
options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS;

if (this._retrieve) {
// Only enable Store retrieval based on strategy
if (this._retrieve && this.shouldUseStore()) {
this.missingMessageRetriever = new MissingMessageRetriever(
this.decoder,
options?.retrieveFrequencyMs,
Expand Down Expand Up @@ -290,17 +312,26 @@ export class ReliableChannel<
public static async create<T extends IDecodedMessage>(
node: IWaku,
channelId: ChannelId,
senderId: SenderId,
senderId: ParticipantId,
encoder: IEncoder,
decoder: IDecoder<T>,
options?: ReliableChannelOptions
): Promise<ReliableChannel<T>> {
const sdsMessageChannel = new MessageChannel(channelId, senderId, options);
// Enable SDS-R repair only if retrieval strategy uses it
const retrievalStrategy = options?.retrievalStrategy ?? "both";
const enableRepair =
retrievalStrategy === "both" || retrievalStrategy === "sds-r-only";

const sdsMessageChannel = new MessageChannel(channelId, senderId, {
...options,
enableRepair
});
const messageChannel = new ReliableChannel(
node,
sdsMessageChannel,
encoder,
decoder,
retrievalStrategy,
options
);

Expand Down Expand Up @@ -455,6 +486,7 @@ export class ReliableChannel<
// missing messages or the status of previous outgoing messages
this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint);

// Remove from Store retriever if message was retrieved
this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId);

if (sdsMessage.content && sdsMessage.content.length > 0) {
Expand Down Expand Up @@ -528,6 +560,9 @@ export class ReliableChannel<
this.setupEventListeners();
this.restartSync();
this.startSweepIncomingBufferLoop();

this.startRepairSweepLoop();

if (this._retrieve) {
this.missingMessageRetriever?.start();
this.queryOnConnect?.start();
Expand All @@ -544,6 +579,7 @@ export class ReliableChannel<
this.removeAllEventListeners();
this.stopSync();
this.stopSweepIncomingBufferLoop();
this.stopRepairSweepLoop();
this.clearProcessTasks();

if (this.activePendingProcessTask) {
Expand Down Expand Up @@ -582,8 +618,55 @@ export class ReliableChannel<
}
}

private startRepairSweepLoop(): void {
if (!this.shouldUseSdsR()) {
return;
}
this.stopRepairSweepLoop();
this.sweepRepairInterval = setInterval(() => {
void this.messageChannel
.sweepRepairIncomingBuffer(async (message) => {
// Rebroadcast the repair message
const wakuMessage = { payload: message.encode() };
const result = await this._send(this.encoder, wakuMessage);
return result.failures.length === 0;
})
.catch((err) => {
log.error("error encountered when sweeping repair buffer", err);
});
}, DEFAULT_SWEEP_REPAIR_INTERVAL_MS);
}

private stopRepairSweepLoop(): void {
if (this.sweepRepairInterval) {
clearInterval(this.sweepRepairInterval);
this.sweepInBufInterval = undefined;
}
}

private shouldUseStore(): boolean {
return (
this.retrievalStrategy === "both" ||
this.retrievalStrategy === "store-only"
);
}

private shouldUseSdsR(): boolean {
return (
this.retrievalStrategy === "both" ||
this.retrievalStrategy === "sds-r-only"
);
}

private restartSync(multiplier: number = 1): void {
this.syncRandomTimeout.restart(multiplier);
// Adaptive sync: use shorter interval when repairs are pending
const hasPendingRepairs =
this.shouldUseSdsR() && this.messageChannel.hasPendingRepairRequests();
const effectiveMultiplier = hasPendingRepairs
? multiplier * SYNC_INTERVAL_REPAIR_MULTIPLIER
: multiplier;

this.syncRandomTimeout.restart(effectiveMultiplier);
}

private stopSync(): void {
Expand Down Expand Up @@ -731,6 +814,8 @@ export class ReliableChannel<
);

for (const { messageId, retrievalHint } of event.detail) {
// Store retrieval (for 'both' and 'store-only' strategies)
// SDS-R repair happens automatically via RepairManager for 'both' and 'sds-r-only'
if (retrievalHint && this.missingMessageRetriever) {
this.missingMessageRetriever.addMissingMessage(
messageId,
Expand Down
10 changes: 0 additions & 10 deletions packages/sds/src/message_channel/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ export enum MessageChannelEvent {
InMessageLost = "sds:in:message-irretrievably-lost",
ErrorTask = "sds:error-task",
// SDS-R Repair Events
RepairRequestQueued = "sds:repair:request-queued",
RepairRequestSent = "sds:repair:request-sent",
RepairRequestReceived = "sds:repair:request-received",
RepairResponseQueued = "sds:repair:response-queued",
RepairResponseSent = "sds:repair:response-sent"
}

Expand All @@ -33,10 +31,6 @@ export type MessageChannelEvents = {
[MessageChannelEvent.OutSyncSent]: CustomEvent<Message>;
[MessageChannelEvent.InSyncReceived]: CustomEvent<Message>;
[MessageChannelEvent.ErrorTask]: CustomEvent<unknown>;
[MessageChannelEvent.RepairRequestQueued]: CustomEvent<{
messageId: MessageId;
tReq: number;
}>;
[MessageChannelEvent.RepairRequestSent]: CustomEvent<{
messageIds: MessageId[];
carrierMessageId: MessageId;
Expand All @@ -45,10 +39,6 @@ export type MessageChannelEvents = {
messageIds: MessageId[];
fromSenderId?: ParticipantId;
}>;
[MessageChannelEvent.RepairResponseQueued]: CustomEvent<{
messageId: MessageId;
tResp: number;
}>;
[MessageChannelEvent.RepairResponseSent]: CustomEvent<{
messageId: MessageId;
}>;
Expand Down
16 changes: 9 additions & 7 deletions packages/sds/src/message_channel/message_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,22 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {

// Only construct RepairManager if repair is enabled (default: true)
if (options.enableRepair ?? true) {
this.repairManager = new RepairManager(
senderId,
options.repairConfig,
(event: string, detail: unknown) => {
this.safeSendEvent(event as MessageChannelEvent, { detail });
}
);
this.repairManager = new RepairManager(senderId, options.repairConfig);
}
}

public static getMessageId(payload: Uint8Array): MessageId {
return bytesToHex(sha256(payload));
}

/**
* Check if there are pending repair requests that need to be sent.
* Useful for adaptive sync intervals - increase frequency when repairs pending.
*/
public hasPendingRepairRequests(currentTime = Date.now()): boolean {
return this.repairManager?.hasRequestsReady(currentTime) ?? false;
}

/**
* Processes all queued tasks sequentially to ensure proper message ordering.
*
Expand Down
37 changes: 11 additions & 26 deletions packages/sds/src/message_channel/repair/repair.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ const log = new Logger("sds:repair:manager");
*/
const PARTICIPANTS_PER_RESPONSE_GROUP = 128;

/**
* Event emitter callback for repair events
*/
export type RepairEventEmitter = (event: string, detail: unknown) => void;

/**
* Configuration for SDS-R repair protocol
*/
Expand Down Expand Up @@ -58,16 +53,10 @@ export class RepairManager {
private readonly config: Required<RepairConfig>;
private readonly outgoingBuffer: OutgoingRepairBuffer;
private readonly incomingBuffer: IncomingRepairBuffer;
private readonly eventEmitter?: RepairEventEmitter;

public constructor(
participantId: ParticipantId,
config: RepairConfig = {},
eventEmitter?: RepairEventEmitter
) {
public constructor(participantId: ParticipantId, config: RepairConfig = {}) {
this.participantId = participantId;
this.config = { ...DEFAULT_REPAIR_CONFIG, ...config };
this.eventEmitter = eventEmitter;

this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize);
this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize);
Expand Down Expand Up @@ -142,19 +131,13 @@ export class RepairManager {
// Calculate when to request this repair
const tReq = this.calculateTReq(entry.messageId, currentTime);

// Add to outgoing buffer - only log and emit event if actually added
// Add to outgoing buffer - only log if actually added
const wasAdded = this.outgoingBuffer.add(entry, tReq);

if (wasAdded) {
log.info(
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
);

// Emit event
this.eventEmitter?.("RepairRequestQueued", {
messageId: entry.messageId,
tReq
});
}
}
}
Expand Down Expand Up @@ -238,19 +221,13 @@ export class RepairManager {
currentTime
);

// Add to incoming buffer - only log and emit event if actually added
// Add to incoming buffer - only log if actually added
const wasAdded = this.incomingBuffer.add(request, tResp);

if (wasAdded) {
log.info(
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
);

// Emit event
this.eventEmitter?.("RepairResponseQueued", {
messageId: request.messageId,
tResp
});
}
}
}
Expand Down Expand Up @@ -328,4 +305,12 @@ export class RepairManager {
`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`
);
}

/**
* Check if there are repair requests ready to be sent
*/
public hasRequestsReady(currentTime = Date.now()): boolean {
const items = this.outgoingBuffer.getItems();
return items.length > 0 && items[0].tReq <= currentTime;
}
}
Loading