Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions Sources/Containerization/LinuxProcess.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,17 @@ public final class LinuxProcess: Sendable {
}

extension LinuxProcess {
func setupIO(streams: [VsockConnectionStream?]) async throws -> [FileHandle?] {
func setupIO(listeners: [VsockListener?]) async throws -> [FileHandle?] {
let handles = try await Timeout.run(seconds: 3) {
try await withThrowingTaskGroup(of: (Int, FileHandle?).self) { group in
var results = [FileHandle?](repeating: nil, count: 3)

for (index, stream) in streams.enumerated() {
guard let stream = stream else { continue }
for (index, listener) in listeners.enumerated() {
guard let listener else { continue }

group.addTask {
let first = await stream.first(where: { _ in true })
stream.finish()
try self.vm.stopListen(stream.port)
let first = await listener.first(where: { _ in true })
try listener.finish()
return (index, first)
}
}
Expand Down Expand Up @@ -233,12 +232,12 @@ extension LinuxProcess {
public func start() async throws {
do {
let spec = self.state.withLock { $0.spec }
var streams = [VsockConnectionStream?](repeating: nil, count: 3)
var listeners = [VsockListener?](repeating: nil, count: 3)
if let stdin = self.ioSetup.stdin {
streams[0] = try self.vm.listen(stdin.port)
listeners[0] = try self.vm.listen(stdin.port)
}
if let stdout = self.ioSetup.stdout {
streams[1] = try self.vm.listen(stdout.port)
listeners[1] = try self.vm.listen(stdout.port)
}
if let stderr = self.ioSetup.stderr {
if spec.process!.terminal {
Expand All @@ -247,11 +246,11 @@ extension LinuxProcess {
message: "stderr should not be configured with terminal=true"
)
}
streams[2] = try self.vm.listen(stderr.port)
listeners[2] = try self.vm.listen(stderr.port)
}

let t = Task {
try await self.setupIO(streams: streams)
try await self.setupIO(listeners: listeners)
}

try await agent.createProcess(
Expand Down
25 changes: 14 additions & 11 deletions Sources/Containerization/UnixSocketRelay.swift
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ package final class SocketRelay: Sendable {
private struct State {
var relaySources: [String: ConnectionSources] = [:]
var t: Task<(), Never>? = nil
var listener: VsockListener? = nil
}

// `DispatchSourceRead` is thread-safe.
Expand Down Expand Up @@ -137,15 +138,15 @@ extension SocketRelay {
t.cancel()
$0.t = nil
$0.relaySources.removeAll()
}

switch configuration.direction {
case .outOf:
// If we created the host conn, lets unlink it also. It's possible it was
// already unlinked if the relay failed earlier.
try? FileManager.default.removeItem(at: self.configuration.destination)
case .into:
try self.vm.stopListen(self.port)
switch configuration.direction {
case .outOf:
// If we created the host conn, lets unlink it also. It's possible it was
// already unlinked if the relay failed earlier.
try? FileManager.default.removeItem(at: self.configuration.destination)
case .into:
try $0.listener?.finish()
}
}
}

Expand Down Expand Up @@ -190,18 +191,20 @@ extension SocketRelay {
let port = self.port
let log = self.log

let connectionStream = try self.vm.listen(self.port)
let listener = try self.vm.listen(self.port)
log?.info(
"listening on guest vsock",
metadata: [
"path": "\(hostPath)",
"vport": "\(port)",
])

self.state.withLock {
$0.listener = listener
$0.t = Task {
do {
defer { connectionStream.finish() }
for await connection in connectionStream {
defer { try? listener.finish() }
for await connection in listener {
try await self.handleGuestVsockConn(
vsockConn: connection,
hostConnectionPath: hostPath,
Expand Down
6 changes: 3 additions & 3 deletions Sources/Containerization/VZVirtualMachineInstance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ extension VZVirtualMachineInstance: VirtualMachineInstance {
}
}

func listen(_ port: UInt32) throws -> VsockConnectionStream {
let stream = VsockConnectionStream(port: port)
func listen(_ port: UInt32) throws -> VsockListener {
let stream = VsockListener(port: port, stopListen: self.stopListen)
let listener = VZVirtioSocketListener()
listener.delegate = stream

Expand All @@ -239,7 +239,7 @@ extension VZVirtualMachineInstance: VirtualMachineInstance {
return stream
}

func stopListen(_ port: UInt32) throws {
private func stopListen(_ port: UInt32) throws {
try self.vm.removeListener(
queue: queue,
port: port
Expand Down
4 changes: 1 addition & 3 deletions Sources/Containerization/VirtualMachineInstance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ public protocol VirtualMachineInstance: Sendable {
/// Dial a vsock port in the guest.
func dial(_ port: UInt32) async throws -> FileHandle
/// Listen on a host vsock port.
func listen(_ port: UInt32) throws -> VsockConnectionStream
/// Stop listening on a vsock port.
func stopListen(_ port: UInt32) throws
func listen(_ port: UInt32) throws -> VsockListener
/// Start the virtual machine.
func start() async throws
/// Stop the virtual machine.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,27 @@ import Virtualization
#endif

/// A stream of vsock connections.
public final class VsockConnectionStream: NSObject, Sendable, AsyncSequence {
public final class VsockListener: NSObject, Sendable, AsyncSequence {
public typealias Element = FileHandle

/// A stream of connections dialed from the remote.
private let connections: AsyncStream<FileHandle>
/// The port the connections are for.
public let port: UInt32

private let connections: AsyncStream<FileHandle>
private let cont: AsyncStream<FileHandle>.Continuation
private let stopListening: @Sendable (_ port: UInt32) throws -> Void

public init(port: UInt32) {
package init(port: UInt32, stopListen: @Sendable @escaping (_ port: UInt32) throws -> Void) {
self.port = port
let (stream, continuation) = AsyncStream.makeStream(of: FileHandle.self)
self.connections = stream
self.cont = continuation
self.stopListening = stopListen
}

public func finish() {
public func finish() throws {
self.cont.finish()
try self.stopListening(self.port)
}

public func makeAsyncIterator() -> AsyncStream<FileHandle>.AsyncIterator {
Expand All @@ -49,7 +51,7 @@ public final class VsockConnectionStream: NSObject, Sendable, AsyncSequence {

#if os(macOS)

extension VsockConnectionStream: VZVirtioSocketListenerDelegate {
extension VsockListener: VZVirtioSocketListenerDelegate {
public func listener(
_: VZVirtioSocketListener, shouldAcceptNewConnection conn: VZVirtioSocketConnection,
from _: VZVirtioSocketDevice
Expand Down