diff --git a/src/composables/useGetMessages.ts b/src/composables/useGetMessages.ts index dcbe84329bf..fbeedab57d0 100644 --- a/src/composables/useGetMessages.ts +++ b/src/composables/useGetMessages.ts @@ -20,7 +20,8 @@ import { subscribe, unsubscribe } from '@nextcloud/event-bus' import { computed, inject, onBeforeUnmount, provide, ref, watch } from 'vue' import { START_LOCATION, useRoute } from 'vue-router' import { useStore } from 'vuex' -import { CHAT, MESSAGE } from '../constants.ts' +import { CHAT, CONFIG, MESSAGE } from '../constants.ts' +import { getTalkConfig } from '../services/CapabilitiesManager.ts' import { EventBus } from '../services/EventBus.ts' import { useChatStore } from '../stores/chat.ts' import { useChatExtrasStore } from '../stores/chatExtras.ts' @@ -41,6 +42,8 @@ type GetMessagesContext = { } const GET_MESSAGES_CONTEXT_KEY: InjectionKey = Symbol.for('GET_MESSAGES_CONTEXT') +// TOREMOVE in main branch +const experimentalChatRelay = (getTalkConfig('local', 'experiments', 'enabled') ?? 0) & CONFIG.EXPERIMENTAL.CHAT_RELAY /** * Check whether caught error is from OCS API @@ -54,6 +57,7 @@ function isAxiosErrorResponse(exception: unknown): exception is AxiosError { @@ -173,8 +182,12 @@ export function useGetMessagesProvider() { unsubscribe('networkOnline', handleNetworkOnline) EventBus.off('route-change', onRouteChange) EventBus.off('set-context-id-to-bottom', setContextIdToBottom) + EventBus.off('signaling-message-received', addMessageFromChatRelay) + EventBus.off('signaling-supported-features', checkChatRelaySupport) + EventBus.off('should-refresh-chat-messages', tryAbortChatRelay) store.dispatch('cancelPollNewMessages', { requestId: currentToken.value }) + stopChatRelay() clearInterval(pollingTimeout) clearInterval(expirationInterval) }) @@ -195,6 +208,7 @@ export function useGetMessagesProvider() { if (currentToken.value) { console.debug('Canceling message request as we are offline') store.dispatch('cancelPollNewMessages', { requestId: currentToken.value }) + stopChatRelay() } } @@ -482,6 +496,10 @@ export function useGetMessagesProvider() { * @param token token of conversation where a method was called */ async function pollNewMessages(token: string) { + if (chatRelayEnabled) { + // Stop polling if chat relay is supported + return + } // Check that the token has not changed if (currentToken.value !== token) { console.debug(`token has changed to ${currentToken.value}, breaking the loop for ${token}`) @@ -499,6 +517,7 @@ export function useGetMessagesProvider() { requestId: token, }) debugTimer.end(`${token} | long polling`, 'status 200') + tryChatRelay() } catch (exception) { if (Axios.isCancel(exception)) { debugTimer.end(`${token} | long polling`, 'cancelled') @@ -512,6 +531,7 @@ export function useGetMessagesProvider() { // This is not an error, so reset error timeout and poll again pollingErrorTimeout = 1_000 clearTimeout(pollingTimeout) + tryChatRelay({ force: true }) pollingTimeout = setTimeout(() => { pollNewMessages(token) }, 500) @@ -539,6 +559,83 @@ export function useGetMessagesProvider() { }, 500) } + /** + * Try to start chat relay + * + * @param options + * @param options.force - to skip end reached check when it is guaranteed + */ + function tryChatRelay(options?: { force: boolean }) { + if (chatRelaySupported && (isChatEndReached.value || options?.force)) { + startChatRelay() + } + } + + /** + * Check whether chat relay is supported + * + * @param features + */ + function checkChatRelaySupport(features: string[]) { + if (experimentalChatRelay && features.includes('chat-relay')) { + chatRelaySupported = true + tryChatRelay() + } else { + chatRelaySupported = false + } + } + + /** + * Initialize chat relay support by stopping polling and listening to chat relay messages + */ + function startChatRelay() { + if (currentToken.value) { + // it might have been set already, ensure we cancel it + store.dispatch('cancelPollNewMessages', { requestId: currentToken.value }) + } + chatRelayEnabled = true + EventBus.on('signaling-message-received', addMessageFromChatRelay) + } + + /** + * Chat relay sends one message at a time, we update our stores directly + * + * @param payload + * @param payload.token + * @param payload.message + */ + function addMessageFromChatRelay(payload: { token: string, message: ChatMessage }) { + const { token, message } = payload + if (token !== currentToken.value) { + // Guard: Message is for another conversation + // e.g., user switched conversation while messages were in-flight + return + } + + chatStore.processChatBlocks(token, [message], { mergeBy: chatStore.getLastKnownId(token) }) + store.dispatch('processMessage', { token, message }) + } + + /** + * Stop chat relay and remove listener + */ + function stopChatRelay() { + chatRelayEnabled = false + EventBus.off('signaling-message-received', addMessageFromChatRelay) + } + + /** + * This is needed when something went wrong after starting chat relay + * and the server is no longer sending us messages events + * so we need to abort it to continue getting messages via polling + */ + function tryAbortChatRelay() { + if (chatRelayEnabled && chatRelaySupported) { + stopChatRelay() + pollNewMessages(currentToken.value) + } + } + provide(GET_MESSAGES_CONTEXT_KEY, { contextMessageId, loadingOldMessages, diff --git a/src/constants.ts b/src/constants.ts index 0e6ed1d84a7..0905cba3e5a 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -22,6 +22,11 @@ export const CONFIG = { * to allow join the call without page reload */ RECOVER_SESSION: 2, + /** + * Since 22.0.3 + * Send chat messages via the High performance-backend / websocket + */ + CHAT_RELAY: 4, }, } as const diff --git a/src/services/EventBus.ts b/src/services/EventBus.ts index 4e0c42ef129..3a91d9fc604 100644 --- a/src/services/EventBus.ts +++ b/src/services/EventBus.ts @@ -21,6 +21,7 @@ import mitt from 'mitt' export type Events = { [key: EventType]: unknown 'audio-player-ended': number + 'signaling-message-received': { token: string, message: ChatMessage } 'conversations-received': { singleConversation?: Conversation, fromBrowserStorage?: boolean } 'session-conflict-confirmation': string 'deleted-session-detected': void @@ -53,6 +54,7 @@ export type Events = { 'signaling-users-joined': [StandaloneSignalingJoinSession[]] 'signaling-users-left': [string[]] 'signaling-all-users-changed-in-call-to-disconnected': void + 'signaling-supported-features': string[] 'smart-picker-open': void 'switch-to-conversation': { token: string } 'talk:poll-added': { token: string, message: ChatMessage } diff --git a/src/utils/signaling.js b/src/utils/signaling.js index 007c8f5c18e..dc12e049770 100644 --- a/src/utils/signaling.js +++ b/src/utils/signaling.js @@ -1113,6 +1113,7 @@ Signaling.Standalone.prototype.helloResponseReceived = function(data) { for (i = 0; i < features.length; i++) { this.features[features[i]] = true } + this._trigger('supportedFeatures', features) } if (!this.settings.helloAuthParams.internal @@ -1430,8 +1431,12 @@ Signaling.Standalone.prototype.processRoomEvent = function(data) { Signaling.Standalone.prototype.processRoomMessageEvent = function(token, data) { switch (data.type) { case 'chat': - // FIXME this is not listened to - EventBus.emit('should-refresh-chat-messages') + if ('comment' in data.chat) { + EventBus.emit('signaling-message-received', { token, message: { ...data.chat.comment, token } }) + } else { + // TOREMOVE after HPB next release + EventBus.emit('should-refresh-chat-messages') + } break case 'recording': EventBus.emit('signaling-recording-status-changed', [token, data.recording.status])