@@ -44,6 +44,7 @@ final class ManagedProcess: Sendable {
4444 private struct State {
4545 init ( io: IO ) {
4646 self . io = io
47+ self . errorPipe = errorPipe
4748 }
4849
4950 let io : IO
@@ -63,6 +64,7 @@ final class ManagedProcess: Sendable {
6364 private let owningPid : Int32 ?
6465 private let ackPipe : Pipe
6566 private let syncPipe : Pipe
67+ private let errorPipe : Pipe
6668 private let terminal : Bool
6769 private let bundle : ContainerizationOCI . Bundle
6870 private let cgroupManager : Cgroup2Manager ?
@@ -95,6 +97,10 @@ final class ManagedProcess: Sendable {
9597 try ackPipe. setCloexec ( )
9698 self . ackPipe = ackPipe
9799
100+ let errorPipe = Pipe ( )
101+ try errorPipe. setCloexec ( )
102+ self . errorPipe = errorPipe
103+
98104 let args : [ String ]
99105 if let owningPid {
100106 args = [
@@ -114,6 +120,7 @@ final class ManagedProcess: Sendable {
114120 extraFiles: [
115121 syncPipe. fileHandleForWriting,
116122 ackPipe. fileHandleForReading,
123+ errorPipe. fileHandleForWriting,
117124 ]
118125 )
119126
@@ -149,125 +156,140 @@ final class ManagedProcess: Sendable {
149156
150157extension ManagedProcess {
151158 func start( ) throws -> Int32 {
152- try self . state. withLock {
153- log. info (
154- " starting managed process " ,
155- metadata: [
156- " id " : " \( id) "
157- ] )
158-
159- // Start the underlying process.
160- try command. start ( )
161- defer {
162- try ? self . ackPipe. fileHandleForWriting. close ( )
163- try ? self . syncPipe. fileHandleForReading. close ( )
164- try ? self . ackPipe. fileHandleForReading. close ( )
165- try ? self . syncPipe. fileHandleForWriting. close ( )
166- }
167-
168- // Close our side of any pipes.
169- try $0. io. closeAfterExec ( )
170- try self . ackPipe. fileHandleForReading. close ( )
171- try self . syncPipe. fileHandleForWriting. close ( )
159+ do {
160+ return try self . state. withLock {
161+ log. info (
162+ " starting managed process " ,
163+ metadata: [
164+ " id " : " \( id) "
165+ ] )
172166
173- let size = MemoryLayout< Int32> . size
174- guard let piddata = try syncPipe. fileHandleForReading. read ( upToCount: size) else {
175- throw ContainerizationError ( . internalError, message: " no PID data from sync pipe " )
176- }
167+ // Start the underlying process.
168+ try command. start ( )
177169
178- guard piddata. count == size else {
179- throw ContainerizationError ( . internalError, message: " invalid payload " )
180- }
170+ defer {
171+ try ? self . ackPipe. fileHandleForWriting. close ( )
172+ try ? self . syncPipe. fileHandleForReading. close ( )
173+ try ? self . ackPipe. fileHandleForReading. close ( )
174+ try ? self . syncPipe. fileHandleForWriting. close ( )
175+ try ? self . errorPipe. fileHandleForWriting. close ( )
176+ }
181177
182- let pid = piddata. withUnsafeBytes { ptr in
183- ptr. load ( as: Int32 . self)
184- }
178+ // Close our side of any pipes.
179+ try $0. io. closeAfterExec ( )
180+ try self . ackPipe. fileHandleForReading. close ( )
181+ try self . syncPipe. fileHandleForWriting. close ( )
185182
186- log. info (
187- " got back pid data " ,
188- metadata: [
189- " pid " : " \( pid) "
190- ] )
191- $0. pid = pid
183+ let size = MemoryLayout< Int32> . size
184+ guard let piddata = try syncPipe. fileHandleForReading. read ( upToCount: size) else {
185+ throw ContainerizationError ( . internalError, message: " no PID data from sync pipe " )
186+ }
192187
193- // This should probably happen in vmexec, but we don't need to set any cgroup
194- // toggles so the problem is much simpler to just do it here.
195- if let owningPid {
196- let cgManager = try Cgroup2Manager . loadFromPid ( pid: owningPid)
197- try cgManager. addProcess ( pid: pid)
198- }
188+ guard piddata. count == size else {
189+ throw ContainerizationError ( . internalError, message: " invalid payload " )
190+ }
199191
200- log. info (
201- " sending pid acknowledgement " ,
202- metadata: [
203- " pid " : " \( pid) "
204- ] )
205- try self . ackPipe. fileHandleForWriting. write ( contentsOf: Self . ackPid. data ( using: . utf8) !)
192+ let pid = piddata. withUnsafeBytes { ptr in
193+ ptr. load ( as: Int32 . self)
194+ }
206195
207- if self . terminal {
208196 log. info (
209- " wait for PTY FD " ,
197+ " got back pid data " ,
210198 metadata: [
211- " id " : " \( id ) "
199+ " pid " : " \( pid ) "
212200 ] )
201+ $0. pid = pid
213202
214- // Wait for a new write that will contain the pty fd if we asked for one.
215- guard let ptyFd = try self . syncPipe. fileHandleForReading. read ( upToCount: size) else {
216- throw ContainerizationError (
217- . internalError,
218- message: " no PTY data from sync pipe "
219- )
220- }
221- let fd = ptyFd. withUnsafeBytes { ptr in
222- ptr. load ( as: Int32 . self)
203+ // This should probably happen in vmexec, but we don't need to set any cgroup
204+ // toggles so the problem is much simpler to just do it here.
205+ if let owningPid {
206+ let cgManager = try Cgroup2Manager . loadFromPid ( pid: owningPid)
207+ try cgManager. addProcess ( pid: pid)
223208 }
209+
224210 log. info (
225- " received PTY FD from container, attaching " ,
211+ " sending pid acknowledgement " ,
226212 metadata: [
227- " id " : " \( id ) "
213+ " pid " : " \( pid ) "
228214 ] )
215+ try self . ackPipe. fileHandleForWriting. write ( contentsOf: Self . ackPid. data ( using: . utf8) !)
216+
217+ if self . terminal {
218+ log. info (
219+ " wait for PTY FD " ,
220+ metadata: [
221+ " id " : " \( id) "
222+ ] )
223+
224+ // Wait for a new write that will contain the pty fd if we asked for one.
225+ guard let ptyFd = try self . syncPipe. fileHandleForReading. read ( upToCount: size) else {
226+ throw ContainerizationError (
227+ . internalError,
228+ message: " no PTY data from sync pipe "
229+ )
230+ }
231+ let fd = ptyFd. withUnsafeBytes { ptr in
232+ ptr. load ( as: Int32 . self)
233+ }
234+ log. info (
235+ " received PTY FD from container, attaching " ,
236+ metadata: [
237+ " id " : " \( id) "
238+ ] )
239+
240+ try $0. io. attach ( pid: pid, fd: fd)
241+ try self . ackPipe. fileHandleForWriting. write ( contentsOf: Self . ackConsole. data ( using: . utf8) !)
242+ }
229243
230- try $0. io. attach ( pid: pid, fd: fd)
231- try self . ackPipe. fileHandleForWriting. write ( contentsOf: Self . ackConsole. data ( using: . utf8) !)
232- }
233-
234- // Wait for the syncPipe to close (after exec).
235- _ = try self . syncPipe. fileHandleForReading. readToEnd ( )
244+ // Wait for the syncPipe to close (after exec).
245+ _ = try self . syncPipe. fileHandleForReading. readToEnd ( )
236246
237- log. info (
238- " started managed process " ,
239- metadata: [
240- " pid " : " \( pid) " ,
241- " id " : " \( id) " ,
242- ] )
247+ log. info (
248+ " started managed process " ,
249+ metadata: [
250+ " pid " : " \( pid) " ,
251+ " id " : " \( id) " ,
252+ ] )
243253
244- return pid
254+ return pid
255+ }
256+ } catch {
257+ if let errorData = try ? self . errorPipe. fileHandleForReading. readToEnd ( ) ,
258+ let errorString = String ( data: errorData, encoding: . utf8) ,
259+ !errorString. isEmpty
260+ {
261+ throw ContainerizationError (
262+ . internalError,
263+ message: " vmexec error: \( errorString. trimmingCharacters ( in: . whitespacesAndNewlines) ) "
264+ )
265+ }
266+ throw error
245267 }
246268 }
247269
248270 func setExit( _ status: Int32 ) {
249- self . state. withLock {
271+ self . state. withLock { state in
250272 self . log. info (
251273 " managed process exit " ,
252274 metadata: [
253275 " status " : " \( status) "
254276 ] )
255277
256278 let exitStatus = ExitStatus ( exitStatus: status, exitedAt: Date . now)
257- $0 . exitStatus = exitStatus
279+ state . exitStatus = exitStatus
258280
259281 do {
260- try $0 . io. close ( )
282+ try state . io. close ( )
261283 } catch {
262284 self . log. error ( " failed to close I/O for process: \( error) " )
263285 }
264286
265- for waiter in $0 . waiters {
287+ for waiter in state . waiters {
266288 waiter. resume ( returning: exitStatus)
267289 }
268290
269- self . log. debug ( " \( $0 . waiters. count) managed process waiters signaled " )
270- $0 . waiters. removeAll ( )
291+ self . log. debug ( " \( state . waiters. count) managed process waiters signaled " )
292+ state . waiters. removeAll ( )
271293 }
272294 }
273295
0 commit comments