@@ -50,6 +50,7 @@ final class ManagedProcess: Sendable {
5050 var waiters : [ CheckedContinuation < ExitStatus , Never > ] = [ ]
5151 var exitStatus : ExitStatus ? = nil
5252 var pid : Int32 ?
53+ var errorPipe : Pipe ?
5354 }
5455
5556 private static let ackPid = " AckPid "
@@ -95,6 +96,9 @@ final class ManagedProcess: Sendable {
9596 try ackPipe. setCloexec ( )
9697 self . ackPipe = ackPipe
9798
99+ let errorPipe = Pipe ( )
100+ try errorPipe. setCloexec ( )
101+
98102 let args : [ String ]
99103 if let owningPid {
100104 args = [
@@ -114,6 +118,7 @@ final class ManagedProcess: Sendable {
114118 extraFiles: [
115119 syncPipe. fileHandleForWriting,
116120 ackPipe. fileHandleForReading,
121+ errorPipe. fileHandleForWriting,
117122 ]
118123 )
119124
@@ -144,130 +149,153 @@ final class ManagedProcess: Sendable {
144149 self . terminal = stdio. terminal
145150 self . bundle = bundle
146151 self . state = Mutex ( State ( io: io) )
152+ self . state. withLock { state in
153+ state. errorPipe = errorPipe
154+ }
147155 }
148156}
149157
150158extension ManagedProcess {
151159 func start( ) throws -> Int32 {
152- try self . state. withLock {
153- log. info (
154- " starting managed process " ,
155- metadata: [
156- " id " : " \( id) "
157- ] )
158-
159- // Start the underlying process.
160- try command. start ( )
161- defer {
162- try ? self . ackPipe. fileHandleForWriting. close ( )
163- try ? self . syncPipe. fileHandleForReading. close ( )
164- try ? self . ackPipe. fileHandleForReading. close ( )
165- try ? self . syncPipe. fileHandleForWriting. close ( )
166- }
167-
168- // Close our side of any pipes.
169- try $0. io. closeAfterExec ( )
170- try self . ackPipe. fileHandleForReading. close ( )
171- try self . syncPipe. fileHandleForWriting. close ( )
160+ do {
161+ try self . state. withLock {
162+ log. info (
163+ " starting managed process " ,
164+ metadata: [
165+ " id " : " \( id) "
166+ ] )
172167
173- let size = MemoryLayout< Int32> . size
174- guard let piddata = try syncPipe. fileHandleForReading. read ( upToCount: size) else {
175- throw ContainerizationError ( . internalError, message: " no PID data from sync pipe " )
176- }
168+ // Start the underlying process.
169+ try command. start ( )
177170
178- guard piddata. count == size else {
179- throw ContainerizationError ( . internalError, message: " invalid payload " )
180- }
171+ defer {
172+ try ? self . ackPipe. fileHandleForWriting. close ( )
173+ try ? self . syncPipe. fileHandleForReading. close ( )
174+ try ? self . ackPipe. fileHandleForReading. close ( )
175+ try ? self . syncPipe. fileHandleForWriting. close ( )
176+ }
181177
182- let pid = piddata. withUnsafeBytes { ptr in
183- ptr. load ( as: Int32 . self)
184- }
178+ // Close our side of any pipes.
179+ try $0. io. closeAfterExec ( )
180+ try self . ackPipe. fileHandleForReading. close ( )
181+ try self . syncPipe. fileHandleForWriting. close ( )
185182
186- log. info (
187- " got back pid data " ,
188- metadata: [
189- " pid " : " \( pid) "
190- ] )
191- $0. pid = pid
183+ let size = MemoryLayout< Int32> . size
184+ guard let piddata = try syncPipe. fileHandleForReading. read ( upToCount: size) else {
185+ throw ContainerizationError ( . internalError, message: " no PID data from sync pipe " )
186+ }
192187
193- // This should probably happen in vmexec, but we don't need to set any cgroup
194- // toggles so the problem is much simpler to just do it here.
195- if let owningPid {
196- let cgManager = try Cgroup2Manager . loadFromPid ( pid: owningPid)
197- try cgManager. addProcess ( pid: pid)
198- }
188+ guard piddata. count == size else {
189+ throw ContainerizationError ( . internalError, message: " invalid payload " )
190+ }
199191
200- log. info (
201- " sending pid acknowledgement " ,
202- metadata: [
203- " pid " : " \( pid) "
204- ] )
205- try self . ackPipe. fileHandleForWriting. write ( contentsOf: Self . ackPid. data ( using: . utf8) !)
192+ let pid = piddata. withUnsafeBytes { ptr in
193+ ptr. load ( as: Int32 . self)
194+ }
206195
207- if self . terminal {
208196 log. info (
209- " wait for PTY FD " ,
197+ " got back pid data " ,
210198 metadata: [
211- " id " : " \( id ) "
199+ " pid " : " \( pid ) "
212200 ] )
201+ $0. pid = pid
213202
214- // Wait for a new write that will contain the pty fd if we asked for one.
215- guard let ptyFd = try self . syncPipe. fileHandleForReading. read ( upToCount: size) else {
216- throw ContainerizationError (
217- . internalError,
218- message: " no PTY data from sync pipe "
219- )
220- }
221- let fd = ptyFd. withUnsafeBytes { ptr in
222- ptr. load ( as: Int32 . self)
203+ // This should probably happen in vmexec, but we don't need to set any cgroup
204+ // toggles so the problem is much simpler to just do it here.
205+ if let owningPid {
206+ let cgManager = try Cgroup2Manager . loadFromPid ( pid: owningPid)
207+ try cgManager. addProcess ( pid: pid)
223208 }
209+
224210 log. info (
225- " received PTY FD from container, attaching " ,
211+ " sending pid acknowledgement " ,
226212 metadata: [
227- " id " : " \( id ) "
213+ " pid " : " \( pid ) "
228214 ] )
215+ try self . ackPipe. fileHandleForWriting. write ( contentsOf: Self . ackPid. data ( using: . utf8) !)
216+
217+ if self . terminal {
218+ log. info (
219+ " wait for PTY FD " ,
220+ metadata: [
221+ " id " : " \( id) "
222+ ] )
223+
224+ // Wait for a new write that will contain the pty fd if we asked for one.
225+ guard let ptyFd = try self . syncPipe. fileHandleForReading. read ( upToCount: size) else {
226+ throw ContainerizationError (
227+ . internalError,
228+ message: " no PTY data from sync pipe "
229+ )
230+ }
231+ let fd = ptyFd. withUnsafeBytes { ptr in
232+ ptr. load ( as: Int32 . self)
233+ }
234+ log. info (
235+ " received PTY FD from container, attaching " ,
236+ metadata: [
237+ " id " : " \( id) "
238+ ] )
239+
240+ try $0. io. attach ( pid: pid, fd: fd)
241+ try self . ackPipe. fileHandleForWriting. write ( contentsOf: Self . ackConsole. data ( using: . utf8) !)
242+ }
229243
230- try $0. io. attach ( pid: pid, fd: fd)
231- try self . ackPipe. fileHandleForWriting. write ( contentsOf: Self . ackConsole. data ( using: . utf8) !)
232- }
233-
234- // Wait for the syncPipe to close (after exec).
235- _ = try self . syncPipe. fileHandleForReading. readToEnd ( )
244+ // Wait for the syncPipe to close (after exec).
245+ _ = try self . syncPipe. fileHandleForReading. readToEnd ( )
236246
237- log. info (
238- " started managed process " ,
239- metadata: [
240- " pid " : " \( pid) " ,
241- " id " : " \( id) " ,
242- ] )
247+ log. info (
248+ " started managed process " ,
249+ metadata: [
250+ " pid " : " \( pid) " ,
251+ " id " : " \( id) " ,
252+ ] )
243253
244- return pid
254+ return pid
255+ }
256+ } catch {
257+ if let errorPipe = self . state. withLock ( { $0. errorPipe } ) {
258+ if let errorData = try ? errorPipe. fileHandleForReading. readToEnd ( ) ,
259+ let errorString = String ( data: errorData, encoding: . utf8) ,
260+ !errorString. isEmpty
261+ {
262+ throw ContainerizationError (
263+ . internalError,
264+ message: " vmexec error: \( errorString. trimmingCharacters ( in: . whitespacesAndNewlines) ) "
265+ )
266+ }
267+ }
268+ throw error
245269 }
246270 }
247271
248272 func setExit( _ status: Int32 ) {
249- self . state. withLock {
273+ self . state. withLock { state in
250274 self . log. info (
251275 " managed process exit " ,
252276 metadata: [
253277 " status " : " \( status) "
254278 ] )
255279
256280 let exitStatus = ExitStatus ( exitStatus: status, exitedAt: Date . now)
257- $0. exitStatus = exitStatus
281+ state. exitStatus = exitStatus
282+
283+ try ? state. errorPipe? . fileHandleForReading. close ( )
284+ try ? state. errorPipe? . fileHandleForWriting. close ( )
285+ state. errorPipe = nil
258286
259287 do {
260- try $0 . io. close ( )
288+ try state . io. close ( )
261289 } catch {
262290 self . log. error ( " failed to close I/O for process: \( error) " )
263291 }
264292
265- for waiter in $0 . waiters {
293+ for waiter in state . waiters {
266294 waiter. resume ( returning: exitStatus)
267295 }
268296
269- self . log. debug ( " \( $0 . waiters. count) managed process waiters signaled " )
270- $0 . waiters. removeAll ( )
297+ self . log. debug ( " \( state . waiters. count) managed process waiters signaled " )
298+ state . waiters. removeAll ( )
271299 }
272300 }
273301
0 commit comments