Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions packages/client/src/adapters/message-port/rpc-link.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,24 @@ describe('rpcLink', () => {

await promise
})

it('ignore invalid messages', async () => {
const port = createPort()
const orpc = createORPCClient(new RPCLink({ port })) as any

const promise = expect(orpc.ping('input')).resolves.toEqual('pong')

await vi.waitFor(() => expect(port.postMessage).toHaveBeenCalledTimes(1))

const decoded = decodeRequest(port.postMessage.mock.calls[0]![0])
const id = decoded.message.id

// Invalid message — should be ignored
onMessage({ data: { invalid: true } })

// Correct message — should be processed
onMessage({ data: await createResponseMessage({ id }) })

await promise
})
})
40 changes: 26 additions & 14 deletions packages/client/src/adapters/message-port/transport.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type { Promisable, Value } from '@orpc/shared'
import type { StandardLazyResponse, StandardRequest } from '@standardserver/core'
import type { DecodePeerMessageOptions, EncodePeerMessageOptions } from '@standardserver/peer'
import type { DecodePeerMessageOptions, EncodePeerMessageOptions, ServerPeerSendMessage } from '@standardserver/peer'
import type { ClientContext, ClientOptions } from '../../types'
import type { StandardLinkTransport } from '../standard'
import type { SupportedMessagePort } from './message-port'
import { isPlainObject, toStringOrBytes, value } from '@orpc/shared'
import { ClientPeer, decodePeerMessage, encodePeerMessage, isServerPeerSendMessage } from '@standardserver/peer'
import { value } from '@orpc/shared'
import { ClientPeer, decodePeerMessage, encodePeerMessage, isPeerMessage, isServerPeerSendMessage } from '@standardserver/peer'
import { onMessagePortClose, onMessagePortMessage, postMessagePortMessage } from './message-port'

type DecodedRequestMessage = ConstructorParameters<typeof ClientPeer>[0] extends (message: infer TMessage) => unknown
Expand Down Expand Up @@ -53,23 +53,35 @@ export class MessagePortLinkTransport<T extends ClientContext> implements Standa
}
})

onMessagePortMessage(port, async (message) => {
if (isPlainObject(message)) {
await this.peer.message(message as any)
return
}
onMessagePortMessage(port, async (data) => {
let peerMessage: ServerPeerSendMessage | undefined

const encodedMessage = await toStringOrBytes(message)
if (typeof data === 'string' || data instanceof Uint8Array) {
// MessagePort receives the exact payload sent, and `encodePeerMessage` only returns string or Uint8Array.
const result = decodePeerMessage(data as string | Uint8Array<ArrayBuffer>, decodePeerMessageOptions)
if (result.matched && isServerPeerSendMessage(result.message)) {
peerMessage = result.message
}
}

const result = decodePeerMessage(encodedMessage, decodePeerMessageOptions)
else if (isPeerMessage(data) && isServerPeerSendMessage(data)) {
peerMessage = data
}

if (result.matched && isServerPeerSendMessage(result.message)) {
await this.peer.message(result.message)
if (peerMessage === undefined) {
return { matched: false }
}

/**
* Message order is important: loading -> decode -> .message.
* This flow must stay synchronous, or we need to use `sequential` helper
*/
await this.peer.message(peerMessage)
return { matched: true }
})

onMessagePortClose(port, () => {
this.peer.close()
onMessagePortClose(port, async () => {
await this.peer.close()
})
}

Expand Down
17 changes: 12 additions & 5 deletions packages/client/src/adapters/websocket/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { StandardLazyResponse, StandardRequest } from '@standardserver/core
import type { DecodePeerMessageOptions, EncodePeerMessageOptions } from '@standardserver/peer'
import type { ClientContext, ClientOptions } from '../../types'
import type { StandardLinkTransport } from '../standard'
import { AbortError, promiseWithResolvers, runWithSignal, sleep, toStringOrBytes } from '@orpc/shared'
import { AbortError, loadBytes, promiseWithResolvers, runWithSignal, sequential, sleep, toStringOrBytes } from '@orpc/shared'
import { ClientPeer, decodePeerMessage, encodePeerMessage, isServerPeerSendMessage } from '@standardserver/peer'

/**
Expand Down Expand Up @@ -196,13 +196,20 @@ export class WebSocketLinkTransport<T extends ClientContext> implements Standard
})
}

websocket.addEventListener('message', async (event: MessageEvent) => {
const message = await toStringOrBytes(event.data)
/**
* Message order is important: loading -> decode -> .message.
* This flow must stay synchronous, or we need to use `sequential` helper
*/
websocket.addEventListener('message', sequential(async (event: MessageEvent) => {
// For better compatibility avoid control or depend on websocket.binaryType
const message = event.data instanceof Blob ? await loadBytes(event.data) : toStringOrBytes(event.data)
const result = decodePeerMessage(message, this.decodePeerMessageOptions)
if (result.matched && isServerPeerSendMessage(result.message)) {
await peer.message(result.message)
// Not awaited: `peer.message` runs handling message may be slow,
// and awaiting it would block decoding of subsequent messages.
peer.message(result.message)
}
})
}))

websocket.addEventListener('close', async (event) => {
connectingResolvers?.resolve()
Expand Down
17 changes: 11 additions & 6 deletions packages/server/src/adapters/crossws/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ export class experimental_CrosswsHandler<T extends Context> {
}

/**
* Handles a single message received from a crossws Peer.
* Handles a single WebSocket message.
*
* @param ws The crossws Peer instance, require consistent instance across messages for proper peer management
* Message order matters. Call this immediately after receiving a message,
* before any other async work, to preserve ordering.
*
* @param ws The crossws peer instance. Use the same instance for all messages.
*/
async message(
ws: CrosswsPeerLike,
Expand All @@ -54,10 +57,12 @@ export class experimental_CrosswsHandler<T extends Context> {
}))
}

/**
* Message order is important: loading -> decode -> .message.
* This flow must stay synchronous, or we need to use `sequential` helper
*/
const encodedMessage = typeof message.rawData === 'string' ? message.rawData : message.uint8Array() as Uint8Array<ArrayBuffer>

const result = decodePeerMessage(encodedMessage, this.decodePeerMessageOptions)

if (result.matched && isClientPeerSendMessage(result.message)) {
await peer.message(
result.message,
Expand All @@ -69,9 +74,9 @@ export class experimental_CrosswsHandler<T extends Context> {
}

/**
* Closes the peer connection and cleans up associated resources.
* Cleans up peer state for a closed WebSocket.
*
* @param ws The crossws Peer instance to close, require consistent instance for proper peer management
* @param ws The same crossws peer instance passed to `.message()`.
*/
async close(ws: CrosswsPeerLike): Promise<void> {
const server = this.peers.get(ws)
Expand Down
62 changes: 34 additions & 28 deletions packages/server/src/adapters/message-port/handler.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import type { SupportedMessagePort } from '@orpc/client/message-port'
import type { MaybeOptionalOptions, Promisable, Value } from '@orpc/shared'
import type { DecodePeerMessageOptions, EncodePeerMessageOptions } from '@standardserver/peer'
import type { ClientPeerSendMessage, DecodePeerMessageOptions, EncodePeerMessageOptions } from '@standardserver/peer'
import type { Context } from '../../context'
import type { StandardHandler } from '../standard'
import type { StandardPeerRequestHandlerOptions } from '../standard-peer'
import { onMessagePortClose, onMessagePortMessage, postMessagePortMessage } from '@orpc/client/message-port'
import { isPlainObject, resolveMaybeOptionalOptions, toStringOrBytes, value } from '@orpc/shared'
import { decodePeerMessage, encodePeerMessage, isClientPeerSendMessage, ServerPeer } from '@standardserver/peer'
import { resolveMaybeOptionalOptions, value } from '@orpc/shared'
import { decodePeerMessage, encodePeerMessage, isClientPeerSendMessage, isPeerMessage, ServerPeer } from '@standardserver/peer'
import { createStandardPeerRequestHandler } from '../standard-peer'

type DecodedResponseMessage = ConstructorParameters<typeof ServerPeer>[0] extends (message: infer TMessage) => unknown
Expand Down Expand Up @@ -54,26 +54,30 @@ export class MessagePortHandler<T extends Context> {
}

/**
* Attaches necessary event listeners to a message port to handle incoming messages and peer management.
* Attaches message and close listeners to a message port.
*
* Prefer this over calling `.message()` and `.close()` manually.
*/
upgrade(
port: SupportedMessagePort,
...rest: MaybeOptionalOptions<StandardPeerRequestHandlerOptions<T>>
): void {
/**
* Message order is important: loading -> decode -> .message.
* This flow must stay synchronous, or we need to use `sequential` helper
*/
onMessagePortMessage(port, message => this.message(port, message, ...rest))
onMessagePortClose(port, () => this.close(port))
}

/**
* Handles a single message received from a message port.
*
* @warning AVOID calling this method directly if `.upgrade()` is used, as `.upgrade()` already sets up necessary event listeners to call this method for incoming messages and manage peer lifecycle.
*
* @param port The message port instance, require consistent instance across messages for proper peer management
* @param port The message port instance. Use the same instance for all messages.
*/
async message(
port: SupportedMessagePort,
data: any,
data: unknown,
...rest: MaybeOptionalOptions<StandardPeerRequestHandlerOptions<T>>
): Promise<{ matched: boolean }> {
let peer = this.peers.get(port)
Expand All @@ -91,42 +95,44 @@ export class MessagePortHandler<T extends Context> {
}))
}

if (isPlainObject(data)) {
await peer.message(
data as any,
createStandardPeerRequestHandler(this.handler, resolveMaybeOptionalOptions(rest)),
)
let peerMessage: ClientPeerSendMessage | undefined

return { matched: true }
if (typeof data === 'string' || data instanceof Uint8Array) {
// MessagePort receives the exact payload sent, and `encodePeerMessage` only returns string or Uint8Array.
const result = decodePeerMessage(data as string | Uint8Array<ArrayBuffer>, this.decodePeerMessageOptions)
if (result.matched && isClientPeerSendMessage(result.message)) {
peerMessage = result.message
}
}

const message = await toStringOrBytes(data)

const result = decodePeerMessage(message, this.decodePeerMessageOptions)
else if (isPeerMessage(data) && isClientPeerSendMessage(data)) {
peerMessage = data
}

if (result.matched && isClientPeerSendMessage(result.message)) {
await peer.message(
result.message,
createStandardPeerRequestHandler(this.handler, resolveMaybeOptionalOptions(rest)),
)
if (peerMessage === undefined) {
return { matched: false }
}

return result
/**
* Message order is important: loading -> decode -> .message.
* This flow must stay synchronous, or we need to use `sequential` helper
*/
await peer.message(peerMessage, createStandardPeerRequestHandler(this.handler, resolveMaybeOptionalOptions(rest)))
return { matched: true }
}

/**
* Called when a message port is closed, to clean up any associated peer state.
* Cleans up peer state for a closed message port.
*
* @warning AVOID calling this method directly if `.upgrade()` is used, as `.upgrade()` already sets up necessary event listeners to call this method for incoming messages and manage peer lifecycle.
*
* @param port The message port instance to clean up, must be the same instance used in `.message()` calls to properly clean up
* @param port The same message port instance passed to `.message()`.
*/
async close(port: SupportedMessagePort): Promise<void> {
const peer = this.peers.get(port)

if (peer) {
await peer.close()
// delete before close to avoid potential race conditions
this.peers.delete(port)
await peer.close()
}
}
}
9 changes: 9 additions & 0 deletions packages/server/src/adapters/message-port/rpc-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ describe('rpcHandler', () => {
})

expect(postMessage).not.toHaveBeenCalled()

await handler.close(serverPort as any) // safe to invoke multiple times
})

it('can receive and send un-encoded messages with transfer option (structured clone)', async () => {
Expand Down Expand Up @@ -285,4 +287,11 @@ describe('rpcHandler', () => {

expect(ws.send).not.toHaveBeenCalled()
})

it('ignore invalid message format', async () => {
const handler = createHandler()
const { serverPort } = createPort()
const result = await handler.message(serverPort as any, { invalid: true })
expect(result.matched).toBe(false)
})
})
Loading
Loading