Skip to content

Commit ffb35af

Browse files
authored
fix: race signal instead of setting up listeners (#36)
Ensures listeners are removed as appropriate.
1 parent 51d83bd commit ffb35af

File tree

4 files changed

+209
-201
lines changed

4 files changed

+209
-201
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
"@multiformats/multiaddr": "^12.4.0",
6767
"@multiformats/multiaddr-matcher": "^2.0.1",
6868
"it-stream-types": "^2.0.2",
69+
"race-signal": "^1.1.3",
6970
"uint8arraylist": "^2.4.8"
7071
},
7172
"devDependencies": {

src/listener.ts

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { setMaxListeners, TypedEventEmitter } from '@libp2p/interface'
22
import { multiaddr } from '@multiformats/multiaddr'
3+
import { raceSignal } from 'race-signal'
34
import { QuicConnection } from './connection.js'
45
import * as napi from './napi.js'
56
import { QuicStreamMuxerFactory } from './stream-muxer.js'
@@ -137,30 +138,27 @@ export class QuicListener extends TypedEventEmitter<ListenerEvents> implements L
137138
if (this.state.status === 'listening') {
138139
const signal = this.state.controller.signal
139140
const listenAddr = this.state.listenAddr
140-
const aborted = new Promise((resolve) => {
141-
signal.addEventListener('abort', () => { resolve(undefined) }, { once: true })
142-
})
141+
143142
while (true) {
144143
try {
145144
const listenerPromise = this.state.listener.inboundConnection()
146145
listenerPromise
147146
.then(() => this.metrics?.events.increment({ connect: true }))
148147
.catch(() => this.metrics?.events.increment({ error: true }))
149-
const connection = await Promise.race([
150-
aborted,
151-
listenerPromise
152-
]) as napi.Connection | undefined
153-
if (connection == null) {
154-
break
155-
}
156148

149+
const connection = await raceSignal(listenerPromise, signal)
157150
this.onInboundConnection(connection).catch((e) => {
158151
this.log.error('%s error handling inbound connection', listenAddr.toString(), e)
159152
})
160153
} catch (e) {
161154
this.log.error('%s error accepting connection', listenAddr.toString(), e)
155+
156+
if (signal.aborted) {
157+
break
158+
}
162159
}
163160
}
161+
164162
this.log('%s no longer awaiting inbound connections', listenAddr.toString())
165163
}
166164
}

src/stream-muxer.ts

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { raceSignal } from 'race-signal'
12
import { QuicStream } from './stream.js'
23
import type * as napi from './napi.js'
34
import type { AbortOptions, ComponentLogger, Logger, Stream, StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface'
@@ -59,24 +60,20 @@ class QuicStreamMuxer implements StreamMuxer {
5960
}
6061

6162
async awaitInboundStreams (): Promise<void> {
62-
const aborted = new Promise((resolve) => {
63-
this.controller.signal.addEventListener('abort', () => { resolve(undefined) }, { once: true })
64-
})
6563
while (true) {
66-
const stream = await Promise.race([
67-
aborted,
68-
this.#connection.inboundStream()
69-
]) as napi.Stream | undefined
70-
if (stream == null) {
71-
break
72-
}
73-
7464
try {
65+
const stream = await raceSignal(this.#connection.inboundStream(), this.controller.signal)
66+
7567
this.onInboundStream(stream)
7668
} catch (e) {
77-
this.log.error('%s error accepting stream', this.id, e)
69+
this.log.error('%s error accepting stream - %e', this.id, e)
70+
71+
if (this.controller.signal.aborted) {
72+
break
73+
}
7874
}
7975
}
76+
8077
this.log('%s no longer awaiting inbound streams', this.id)
8178
}
8279

@@ -101,6 +98,7 @@ class QuicStreamMuxer implements StreamMuxer {
10198

10299
async newStream (name?: string): Promise<Stream> {
103100
const str = await this.#connection.outboundStream()
101+
this.controller.signal.throwIfAborted()
104102
const stream = new QuicStream({
105103
id: str.id(),
106104
stream: str,

0 commit comments

Comments
 (0)