From 38caad7b288ad9bf2a6603341d9ac0be73f0fff4 Mon Sep 17 00:00:00 2001 From: Danny Canter Date: Mon, 17 Nov 2025 02:24:20 -0800 Subject: [PATCH] VirtualMachineInstance: Remove stopListen Remove this in favor of just having the VsockConnectionStream type (renamed to VsockListener) stop the listen in finish(). --- Sources/Containerization/LinuxProcess.swift | 21 ++++++++-------- .../Containerization/UnixSocketRelay.swift | 25 +++++++++++-------- .../VZVirtualMachineInstance.swift | 6 ++--- .../VirtualMachineInstance.swift | 4 +-- ...ectionStream.swift => VsockListener.swift} | 14 ++++++----- 5 files changed, 36 insertions(+), 34 deletions(-) rename Sources/Containerization/{VsockConnectionStream.swift => VsockListener.swift} (82%) diff --git a/Sources/Containerization/LinuxProcess.swift b/Sources/Containerization/LinuxProcess.swift index 3d62016f..4e6ed024 100644 --- a/Sources/Containerization/LinuxProcess.swift +++ b/Sources/Containerization/LinuxProcess.swift @@ -121,18 +121,17 @@ public final class LinuxProcess: Sendable { } extension LinuxProcess { - func setupIO(streams: [VsockConnectionStream?]) async throws -> [FileHandle?] { + func setupIO(listeners: [VsockListener?]) async throws -> [FileHandle?] { let handles = try await Timeout.run(seconds: 3) { try await withThrowingTaskGroup(of: (Int, FileHandle?).self) { group in var results = [FileHandle?](repeating: nil, count: 3) - for (index, stream) in streams.enumerated() { - guard let stream = stream else { continue } + for (index, listener) in listeners.enumerated() { + guard let listener else { continue } group.addTask { - let first = await stream.first(where: { _ in true }) - stream.finish() - try self.vm.stopListen(stream.port) + let first = await listener.first(where: { _ in true }) + try listener.finish() return (index, first) } } @@ -233,12 +232,12 @@ extension LinuxProcess { public func start() async throws { do { let spec = self.state.withLock { $0.spec } - var streams = [VsockConnectionStream?](repeating: nil, count: 3) + var listeners = [VsockListener?](repeating: nil, count: 3) if let stdin = self.ioSetup.stdin { - streams[0] = try self.vm.listen(stdin.port) + listeners[0] = try self.vm.listen(stdin.port) } if let stdout = self.ioSetup.stdout { - streams[1] = try self.vm.listen(stdout.port) + listeners[1] = try self.vm.listen(stdout.port) } if let stderr = self.ioSetup.stderr { if spec.process!.terminal { @@ -247,11 +246,11 @@ extension LinuxProcess { message: "stderr should not be configured with terminal=true" ) } - streams[2] = try self.vm.listen(stderr.port) + listeners[2] = try self.vm.listen(stderr.port) } let t = Task { - try await self.setupIO(streams: streams) + try await self.setupIO(listeners: listeners) } try await agent.createProcess( diff --git a/Sources/Containerization/UnixSocketRelay.swift b/Sources/Containerization/UnixSocketRelay.swift index 27201004..87391e8e 100644 --- a/Sources/Containerization/UnixSocketRelay.swift +++ b/Sources/Containerization/UnixSocketRelay.swift @@ -88,6 +88,7 @@ package final class SocketRelay: Sendable { private struct State { var relaySources: [String: ConnectionSources] = [:] var t: Task<(), Never>? = nil + var listener: VsockListener? = nil } // `DispatchSourceRead` is thread-safe. @@ -137,15 +138,15 @@ extension SocketRelay { t.cancel() $0.t = nil $0.relaySources.removeAll() - } - switch configuration.direction { - case .outOf: - // If we created the host conn, lets unlink it also. It's possible it was - // already unlinked if the relay failed earlier. - try? FileManager.default.removeItem(at: self.configuration.destination) - case .into: - try self.vm.stopListen(self.port) + switch configuration.direction { + case .outOf: + // If we created the host conn, lets unlink it also. It's possible it was + // already unlinked if the relay failed earlier. + try? FileManager.default.removeItem(at: self.configuration.destination) + case .into: + try $0.listener?.finish() + } } } @@ -190,18 +191,20 @@ extension SocketRelay { let port = self.port let log = self.log - let connectionStream = try self.vm.listen(self.port) + let listener = try self.vm.listen(self.port) log?.info( "listening on guest vsock", metadata: [ "path": "\(hostPath)", "vport": "\(port)", ]) + self.state.withLock { + $0.listener = listener $0.t = Task { do { - defer { connectionStream.finish() } - for await connection in connectionStream { + defer { try? listener.finish() } + for await connection in listener { try await self.handleGuestVsockConn( vsockConn: connection, hostConnectionPath: hostPath, diff --git a/Sources/Containerization/VZVirtualMachineInstance.swift b/Sources/Containerization/VZVirtualMachineInstance.swift index 3e1baa30..3ccd37a2 100644 --- a/Sources/Containerization/VZVirtualMachineInstance.swift +++ b/Sources/Containerization/VZVirtualMachineInstance.swift @@ -226,8 +226,8 @@ extension VZVirtualMachineInstance: VirtualMachineInstance { } } - func listen(_ port: UInt32) throws -> VsockConnectionStream { - let stream = VsockConnectionStream(port: port) + func listen(_ port: UInt32) throws -> VsockListener { + let stream = VsockListener(port: port, stopListen: self.stopListen) let listener = VZVirtioSocketListener() listener.delegate = stream @@ -239,7 +239,7 @@ extension VZVirtualMachineInstance: VirtualMachineInstance { return stream } - func stopListen(_ port: UInt32) throws { + private func stopListen(_ port: UInt32) throws { try self.vm.removeListener( queue: queue, port: port diff --git a/Sources/Containerization/VirtualMachineInstance.swift b/Sources/Containerization/VirtualMachineInstance.swift index 15ed0a21..d89149f6 100644 --- a/Sources/Containerization/VirtualMachineInstance.swift +++ b/Sources/Containerization/VirtualMachineInstance.swift @@ -40,9 +40,7 @@ public protocol VirtualMachineInstance: Sendable { /// Dial a vsock port in the guest. func dial(_ port: UInt32) async throws -> FileHandle /// Listen on a host vsock port. - func listen(_ port: UInt32) throws -> VsockConnectionStream - /// Stop listening on a vsock port. - func stopListen(_ port: UInt32) throws + func listen(_ port: UInt32) throws -> VsockListener /// Start the virtual machine. func start() async throws /// Stop the virtual machine. diff --git a/Sources/Containerization/VsockConnectionStream.swift b/Sources/Containerization/VsockListener.swift similarity index 82% rename from Sources/Containerization/VsockConnectionStream.swift rename to Sources/Containerization/VsockListener.swift index c985410a..361b53b9 100644 --- a/Sources/Containerization/VsockConnectionStream.swift +++ b/Sources/Containerization/VsockListener.swift @@ -21,25 +21,27 @@ import Virtualization #endif /// A stream of vsock connections. -public final class VsockConnectionStream: NSObject, Sendable, AsyncSequence { +public final class VsockListener: NSObject, Sendable, AsyncSequence { public typealias Element = FileHandle - /// A stream of connections dialed from the remote. - private let connections: AsyncStream /// The port the connections are for. public let port: UInt32 + private let connections: AsyncStream private let cont: AsyncStream.Continuation + private let stopListening: @Sendable (_ port: UInt32) throws -> Void - public init(port: UInt32) { + package init(port: UInt32, stopListen: @Sendable @escaping (_ port: UInt32) throws -> Void) { self.port = port let (stream, continuation) = AsyncStream.makeStream(of: FileHandle.self) self.connections = stream self.cont = continuation + self.stopListening = stopListen } - public func finish() { + public func finish() throws { self.cont.finish() + try self.stopListening(self.port) } public func makeAsyncIterator() -> AsyncStream.AsyncIterator { @@ -49,7 +51,7 @@ public final class VsockConnectionStream: NSObject, Sendable, AsyncSequence { #if os(macOS) -extension VsockConnectionStream: VZVirtioSocketListenerDelegate { +extension VsockListener: VZVirtioSocketListenerDelegate { public func listener( _: VZVirtioSocketListener, shouldAcceptNewConnection conn: VZVirtioSocketConnection, from _: VZVirtioSocketDevice