Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 98 additions & 1 deletion src/composables/useGetMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import { subscribe, unsubscribe } from '@nextcloud/event-bus'
import { computed, inject, onBeforeUnmount, provide, ref, watch } from 'vue'
import { START_LOCATION, useRoute } from 'vue-router'
import { useStore } from 'vuex'
import { CHAT, MESSAGE } from '../constants.ts'
import { CHAT, CONFIG, MESSAGE } from '../constants.ts'
import { getTalkConfig } from '../services/CapabilitiesManager.ts'
import { EventBus } from '../services/EventBus.ts'
import { useChatStore } from '../stores/chat.ts'
import { useChatExtrasStore } from '../stores/chatExtras.ts'
Expand All @@ -41,6 +42,8 @@ type GetMessagesContext = {
}

const GET_MESSAGES_CONTEXT_KEY: InjectionKey<GetMessagesContext> = Symbol.for('GET_MESSAGES_CONTEXT')
// TOREMOVE in main branch
const experimentalChatRelay = (getTalkConfig('local', 'experiments', 'enabled') ?? 0) & CONFIG.EXPERIMENTAL.CHAT_RELAY

/**
* Check whether caught error is from OCS API
Expand All @@ -54,6 +57,7 @@ function isAxiosErrorResponse(exception: unknown): exception is AxiosError<strin
let pollingTimeout: NodeJS.Timeout | undefined
let expirationInterval: NodeJS.Timeout | undefined
let pollingErrorTimeout = 1_000
let chatRelaySupported = false

/**
* Composable to provide control logic for fetching messages list
Expand All @@ -74,6 +78,7 @@ export function useGetMessagesProvider() {
const loadingNewMessages = ref(false)
const isInitialisingMessages = ref(true)
const stopFetchingOldMessages = ref(false)
let chatRelayEnabled = false

/**
* Returns whether the current participant is a participant of current conversation.
Expand Down Expand Up @@ -144,12 +149,14 @@ export function useGetMessagesProvider() {
}
if (oldToken && oldToken !== newToken) {
store.dispatch('cancelPollNewMessages', { requestId: oldToken })
stopChatRelay()
}

if (newToken && canGetMessages) {
handleStartGettingMessagesPreconditions(newToken)
} else {
store.dispatch('cancelPollNewMessages', { requestId: newToken })
stopChatRelay()
}

/** Remove expired messages when joining a room */
Expand All @@ -162,6 +169,8 @@ export function useGetMessagesProvider() {
subscribe('networkOnline', handleNetworkOnline)
EventBus.on('route-change', onRouteChange)
EventBus.on('set-context-id-to-bottom', setContextIdToBottom)
EventBus.on('signaling-supported-features', checkChatRelaySupport)
EventBus.on('should-refresh-chat-messages', tryAbortChatRelay)

/** Every 30 seconds we remove expired messages from the store */
expirationInterval = setInterval(() => {
Expand All @@ -173,8 +182,12 @@ export function useGetMessagesProvider() {
unsubscribe('networkOnline', handleNetworkOnline)
EventBus.off('route-change', onRouteChange)
EventBus.off('set-context-id-to-bottom', setContextIdToBottom)
EventBus.off('signaling-message-received', addMessageFromChatRelay)
EventBus.off('signaling-supported-features', checkChatRelaySupport)
EventBus.off('should-refresh-chat-messages', tryAbortChatRelay)

store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
stopChatRelay()
clearInterval(pollingTimeout)
clearInterval(expirationInterval)
})
Expand All @@ -195,6 +208,7 @@ export function useGetMessagesProvider() {
if (currentToken.value) {
console.debug('Canceling message request as we are offline')
store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
stopChatRelay()
}
}

Expand Down Expand Up @@ -482,6 +496,10 @@ export function useGetMessagesProvider() {
* @param token token of conversation where a method was called
*/
async function pollNewMessages(token: string) {
if (chatRelayEnabled) {
// Stop polling if chat relay is supported
return
}
// Check that the token has not changed
if (currentToken.value !== token) {
console.debug(`token has changed to ${currentToken.value}, breaking the loop for ${token}`)
Expand All @@ -499,6 +517,7 @@ export function useGetMessagesProvider() {
requestId: token,
})
debugTimer.end(`${token} | long polling`, 'status 200')
tryChatRelay()
} catch (exception) {
if (Axios.isCancel(exception)) {
debugTimer.end(`${token} | long polling`, 'cancelled')
Expand All @@ -512,6 +531,7 @@ export function useGetMessagesProvider() {
// This is not an error, so reset error timeout and poll again
pollingErrorTimeout = 1_000
clearTimeout(pollingTimeout)
tryChatRelay({ force: true })
pollingTimeout = setTimeout(() => {
pollNewMessages(token)
}, 500)
Expand Down Expand Up @@ -539,6 +559,83 @@ export function useGetMessagesProvider() {
}, 500)
}

/**
* Try to start chat relay
*
* @param options
* @param options.force - to skip end reached check when it is guaranteed
*/
function tryChatRelay(options?: { force: boolean }) {
if (chatRelaySupported && (isChatEndReached.value || options?.force)) {
startChatRelay()
}
}

/**
* Check whether chat relay is supported
*
* @param features
*/
function checkChatRelaySupport(features: string[]) {
if (experimentalChatRelay && features.includes('chat-relay')) {
chatRelaySupported = true
tryChatRelay()
} else {
chatRelaySupported = false
}
}

/**
* Initialize chat relay support by stopping polling and listening to chat relay messages
*/
function startChatRelay() {
if (currentToken.value) {
// it might have been set already, ensure we cancel it
store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
}
chatRelayEnabled = true
EventBus.on('signaling-message-received', addMessageFromChatRelay)
}

/**
* Chat relay sends one message at a time, we update our stores directly
*
* @param payload
* @param payload.token
* @param payload.message
*/
function addMessageFromChatRelay(payload: { token: string, message: ChatMessage }) {
const { token, message } = payload
if (token !== currentToken.value) {
// Guard: Message is for another conversation
// e.g., user switched conversation while messages were in-flight
return
}

chatStore.processChatBlocks(token, [message], { mergeBy: chatStore.getLastKnownId(token) })
store.dispatch('processMessage', { token, message })
}

/**
* Stop chat relay and remove listener
*/
function stopChatRelay() {
chatRelayEnabled = false
EventBus.off('signaling-message-received', addMessageFromChatRelay)
}

/**
* This is needed when something went wrong after starting chat relay
* and the server is no longer sending us messages events
* so we need to abort it to continue getting messages via polling
*/
function tryAbortChatRelay() {
if (chatRelayEnabled && chatRelaySupported) {
stopChatRelay()
pollNewMessages(currentToken.value)
}
}

provide(GET_MESSAGES_CONTEXT_KEY, {
contextMessageId,
loadingOldMessages,
Expand Down
5 changes: 5 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ export const CONFIG = {
* to allow join the call without page reload
*/
RECOVER_SESSION: 2,
/**
* Since 22.0.3
* Send chat messages via the High performance-backend / websocket
*/
CHAT_RELAY: 4,
},
} as const

Expand Down
2 changes: 2 additions & 0 deletions src/services/EventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import mitt from 'mitt'
export type Events = {
[key: EventType]: unknown
'audio-player-ended': number
'signaling-message-received': { token: string, message: ChatMessage }
'conversations-received': { singleConversation?: Conversation, fromBrowserStorage?: boolean }
'session-conflict-confirmation': string
'deleted-session-detected': void
Expand Down Expand Up @@ -53,6 +54,7 @@ export type Events = {
'signaling-users-joined': [StandaloneSignalingJoinSession[]]
'signaling-users-left': [string[]]
'signaling-all-users-changed-in-call-to-disconnected': void
'signaling-supported-features': string[]
'smart-picker-open': void
'switch-to-conversation': { token: string }
'talk:poll-added': { token: string, message: ChatMessage }
Expand Down
9 changes: 7 additions & 2 deletions src/utils/signaling.js
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,7 @@ Signaling.Standalone.prototype.helloResponseReceived = function(data) {
for (i = 0; i < features.length; i++) {
this.features[features[i]] = true
}
this._trigger('supportedFeatures', features)
}

if (!this.settings.helloAuthParams.internal
Expand Down Expand Up @@ -1430,8 +1431,12 @@ Signaling.Standalone.prototype.processRoomEvent = function(data) {
Signaling.Standalone.prototype.processRoomMessageEvent = function(token, data) {
switch (data.type) {
case 'chat':
// FIXME this is not listened to
EventBus.emit('should-refresh-chat-messages')
if ('comment' in data.chat) {
EventBus.emit('signaling-message-received', { token, message: { ...data.chat.comment, token } })
} else {
// TOREMOVE after HPB next release
EventBus.emit('should-refresh-chat-messages')
}
break
case 'recording':
EventBus.emit('signaling-recording-status-changed', [token, data.recording.status])
Expand Down