@@ -394,8 +394,9 @@ final class ChildChannelMultiplexerTests: XCTestCase {
394394 let errorLogger = ErrorLoggingHandler ( )
395395
396396 let harness = self . harness { channel, _ in
397- channel. pipeline. addHandler ( errorLogger) . flatMap {
398- channel. eventLoop. makeFailedFuture ( MultiplexerTestError . rejected)
397+ channel. eventLoop. makeCompletedFuture {
398+ try channel. pipeline. syncOperations. addHandler ( errorLogger)
399+ throw MultiplexerTestError . rejected
399400 }
400401 }
401402 defer {
@@ -627,9 +628,10 @@ final class ChildChannelMultiplexerTests: XCTestCase {
627628 buffer. writeString ( " Hello from the unit tests! " )
628629
629630 harness. multiplexer. createChildChannel ( channelType: . session) { channel, _ in
630- channel. write ( SSHChannelData ( type: . channel, data: . byteBuffer( buffer) ) , promise: nil )
631- channel. write ( SSHChannelData ( type: . stdErr, data: . byteBuffer( buffer) ) , promise: nil )
632- channel. flush ( )
631+ let sync = channel. pipeline. syncOperations
632+ sync. write ( NIOAny ( SSHChannelData ( type: . channel, data: . byteBuffer( buffer) ) ) , promise: nil )
633+ sync. write ( NIOAny ( SSHChannelData ( type: . stdErr, data: . byteBuffer( buffer) ) ) , promise: nil )
634+ sync. flush ( )
633635 return channel. eventLoop. makeSucceededFuture ( ( ) )
634636 }
635637
@@ -709,8 +711,8 @@ final class ChildChannelMultiplexerTests: XCTestCase {
709711
710712 // Let's create a channel.
711713 harness. multiplexer. createChildChannel ( channelType: . session) { channel, _ in
712- channel. setOption ( ChannelOptions . autoRead, value: false ) . flatMap {
713- channel. pipeline. addHandler ( readRecorder. value)
714+ channel. setOption ( ChannelOptions . autoRead, value: false ) . flatMapThrowing {
715+ try channel. pipeline. syncOperations . addHandler ( readRecorder. value)
714716 }
715717 }
716718
@@ -774,7 +776,9 @@ final class ChildChannelMultiplexerTests: XCTestCase {
774776 var childChannels : [ Channel ] = [ ]
775777 let harness = self . harness { channel, _ in
776778 childChannels. append ( channel)
777- return channel. pipeline. addHandler ( ChannelInactiveRecorder ( ) )
779+ return channel. eventLoop. makeCompletedFuture {
780+ try channel. pipeline. syncOperations. addHandler ( ChannelInactiveRecorder ( ) )
781+ }
778782 }
779783 defer {
780784 harness. finish ( )
@@ -807,7 +811,9 @@ final class ChildChannelMultiplexerTests: XCTestCase {
807811 var childChannels : [ Channel ] = [ ]
808812 let harness = self . harness { channel, _ in
809813 childChannels. append ( channel)
810- return channel. pipeline. addHandler ( ChannelInactiveRecorder ( ) )
814+ return channel. eventLoop. makeCompletedFuture {
815+ try channel. pipeline. syncOperations. addHandler ( ChannelInactiveRecorder ( ) )
816+ }
811817 }
812818 defer {
813819 harness. finish ( )
@@ -836,7 +842,9 @@ final class ChildChannelMultiplexerTests: XCTestCase {
836842 // All channels are already inactive, but still have their inactive recorder (and so have not seen an event loop tick).
837843 XCTAssertTrue ( childChannels. allSatisfy { !$0. isActive } )
838844 XCTAssertTrue (
839- childChannels. allSatisfy { ( try ? $0. pipeline. handler ( type: ChannelInactiveRecorder . self) . wait ( ) ) != nil }
845+ childChannels. allSatisfy {
846+ ( try ? $0. pipeline. syncOperations. handler ( type: ChannelInactiveRecorder . self) ) != nil
847+ }
840848 )
841849
842850 // Claim the parent has gone inactive. All should go inactive.
@@ -846,7 +854,9 @@ final class ChildChannelMultiplexerTests: XCTestCase {
846854 harness. eventLoop. run ( )
847855 XCTAssertTrue ( childChannels. allSatisfy { !$0. isActive } )
848856 XCTAssertTrue (
849- childChannels. allSatisfy { ( try ? $0. pipeline. handler ( type: ChannelInactiveRecorder . self) . wait ( ) ) == nil }
857+ childChannels. allSatisfy {
858+ ( try ? $0. pipeline. syncOperations. handler ( type: ChannelInactiveRecorder . self) ) == nil
859+ }
850860 )
851861
852862 // And they didn't say anything.
@@ -964,10 +974,12 @@ final class ChildChannelMultiplexerTests: XCTestCase {
964974
965975 // Let's create a channel.
966976 harness. multiplexer. createChildChannel ( channelType: . session) { channel, _ in
967- channel. setOption ( ChannelOptions . autoRead, value: false ) . flatMap {
968- channel. setOption ( ChannelOptions . allowRemoteHalfClosure, value: true )
969- } . flatMap {
970- channel. pipeline. addHandlers ( [ readRecorder. value, eofRecorder. value] )
977+ channel. eventLoop. makeCompletedFuture {
978+ // SSH child channel supports sync options so '!' is okay.
979+ try channel. syncOptions!. setOption ( . autoRead, value: false )
980+ try channel. syncOptions!. setOption ( . allowRemoteHalfClosure, value: true )
981+ try channel. pipeline. syncOperations. addHandler ( readRecorder. value)
982+ try channel. pipeline. syncOperations. addHandler ( eofRecorder. value)
971983 }
972984 }
973985
@@ -1010,7 +1022,9 @@ final class ChildChannelMultiplexerTests: XCTestCase {
10101022 var childChannel : Channel ?
10111023 harness. multiplexer. createChildChannel ( channelType: . session) { channel, _ in
10121024 childChannel = channel
1013- return channel. pipeline. addHandler ( inactiveRecorder. value)
1025+ return channel. eventLoop. makeCompletedFuture {
1026+ try channel. pipeline. syncOperations. addHandler ( inactiveRecorder. value)
1027+ }
10141028 }
10151029
10161030 guard let channel = childChannel else {
@@ -1031,7 +1045,7 @@ final class ChildChannelMultiplexerTests: XCTestCase {
10311045 buffer. writeString ( " Hello from the unit tests! " )
10321046
10331047 for _ in 0 ..< 5 {
1034- channel. write ( SSHChannelData ( type: . channel, data: . byteBuffer( buffer) ) , promise: nil )
1048+ channel. pipeline . write ( SSHChannelData ( type: . channel, data: . byteBuffer( buffer) ) , promise: nil )
10351049 }
10361050
10371051 // Now we're going to add a final write: this will have a write promise. It should complete before
@@ -1076,7 +1090,9 @@ final class ChildChannelMultiplexerTests: XCTestCase {
10761090 var childChannel : Channel ?
10771091 harness. multiplexer. createChildChannel ( channelType: . session) { channel, _ in
10781092 childChannel = channel
1079- return channel. pipeline. addHandler ( readRecorder)
1093+ return channel. eventLoop. makeCompletedFuture {
1094+ try channel. pipeline. syncOperations. addHandler ( readRecorder)
1095+ }
10801096 }
10811097
10821098 guard let channel = childChannel else {
@@ -1214,15 +1230,19 @@ final class ChildChannelMultiplexerTests: XCTestCase {
12141230 buffer. writeBytes ( 0 ..< 6 )
12151231
12161232 // Ok, send 3 bytes of data. Nothing happens. However, when this completes writability will still be false.
1217- channel. write (
1218- SSHChannelData ( type: . channel, data: . byteBuffer( buffer. getSlice ( at: buffer. readerIndex, length: 3 ) !) ) ,
1233+ channel. pipeline. syncOperations. write (
1234+ NIOAny (
1235+ SSHChannelData ( type: . channel, data: . byteBuffer( buffer. getSlice ( at: buffer. readerIndex, length: 3 ) !) )
1236+ ) ,
12191237 promise: nil
12201238 )
12211239 XCTAssertTrue ( channel. isWritable)
12221240
12231241 // Now write 2 bytes of stderr. This flips the writability to false.
1224- channel. write (
1225- SSHChannelData ( type: . channel, data: . byteBuffer( buffer. getSlice ( at: buffer. readerIndex, length: 2 ) !) ) ,
1242+ channel. pipeline. syncOperations. write (
1243+ NIOAny (
1244+ SSHChannelData ( type: . channel, data: . byteBuffer( buffer. getSlice ( at: buffer. readerIndex, length: 2 ) !) )
1245+ ) ,
12261246 promise: nil
12271247 )
12281248 XCTAssertFalse ( channel. isWritable)
@@ -1234,7 +1254,10 @@ final class ChildChannelMultiplexerTests: XCTestCase {
12341254 XCTAssertEqual ( harness. flushedMessages. count, 3 )
12351255
12361256 // Another attempt at writing queues the write.
1237- channel. writeAndFlush ( SSHChannelData ( type: . channel, data: . byteBuffer( buffer) ) , promise: nil )
1257+ channel. pipeline. syncOperations. writeAndFlush (
1258+ NIOAny ( SSHChannelData ( type: . channel, data: . byteBuffer( buffer) ) ) ,
1259+ promise: nil
1260+ )
12381261 XCTAssertFalse ( channel. isWritable)
12391262 XCTAssertEqual ( harness. flushedMessages. count, 3 )
12401263
@@ -1303,15 +1326,19 @@ final class ChildChannelMultiplexerTests: XCTestCase {
13031326
13041327 // Ok, we're gonna write the first 4 bytes. The channel will stay writable.
13051328 XCTAssertTrue ( channel. isWritable)
1306- channel. writeAndFlush (
1307- SSHChannelData ( type: . channel, data: . byteBuffer( buffer. getSlice ( at: buffer. readerIndex, length: 4 ) !) ) ,
1329+ channel. pipeline. syncOperations. writeAndFlush (
1330+ NIOAny (
1331+ SSHChannelData ( type: . channel, data: . byteBuffer( buffer. getSlice ( at: buffer. readerIndex, length: 4 ) !) )
1332+ ) ,
13081333 promise: nil
13091334 )
13101335 XCTAssertTrue ( channel. isWritable)
13111336
13121337 // The next byte makes the channel not writable.
1313- channel. write (
1314- SSHChannelData ( type: . channel, data: . byteBuffer( buffer. getSlice ( at: buffer. readerIndex, length: 1 ) !) ) ,
1338+ channel. pipeline. syncOperations. write (
1339+ NIOAny (
1340+ SSHChannelData ( type: . channel, data: . byteBuffer( buffer. getSlice ( at: buffer. readerIndex, length: 1 ) !) )
1341+ ) ,
13151342 promise: nil
13161343 )
13171344 XCTAssertFalse ( channel. isWritable)
@@ -1576,7 +1603,10 @@ final class ChildChannelMultiplexerTests: XCTestCase {
15761603 buffer. writeBytes ( 0 ..< 6 )
15771604
15781605 // Ok, send 6 bytes of data immediately. The writability is false.
1579- channel. writeAndFlush ( SSHChannelData ( type: . channel, data: . byteBuffer( buffer) ) , promise: nil )
1606+ channel. pipeline. syncOperations. writeAndFlush (
1607+ NIOAny ( SSHChannelData ( type: . channel, data: . byteBuffer( buffer) ) ) ,
1608+ promise: nil
1609+ )
15801610 XCTAssertFalse ( channel. isWritable)
15811611
15821612 // Two writes should have occurred, one of size 3 and one of size 2.
@@ -1598,7 +1628,10 @@ final class ChildChannelMultiplexerTests: XCTestCase {
15981628 XCTAssertEqual ( harness. flushedMessages. count, 3 )
15991629
16001630 // Issue another write, now of extended data, which is also bound by this limit. Again, nothing changes.
1601- channel. writeAndFlush ( SSHChannelData ( type: . stdErr, data: . byteBuffer( buffer) ) , promise: nil )
1631+ channel. pipeline. syncOperations. writeAndFlush (
1632+ NIOAny ( SSHChannelData ( type: . stdErr, data: . byteBuffer( buffer) ) ) ,
1633+ promise: nil
1634+ )
16021635 XCTAssertFalse ( channel. isWritable)
16031636 XCTAssertEqual ( harness. flushedMessages. count, 3 )
16041637
@@ -1670,7 +1703,10 @@ final class ChildChannelMultiplexerTests: XCTestCase {
16701703 buffer. writeBytes ( 0 ..< 5 )
16711704
16721705 // Ok, we're gonna write 5 bytes. These will be split into two writes.
1673- channel. writeAndFlush ( SSHChannelData ( type: . channel, data: . byteBuffer( buffer) ) , promise: nil )
1706+ channel. pipeline. syncOperations. writeAndFlush (
1707+ NIOAny ( SSHChannelData ( type: . channel, data: . byteBuffer( buffer) ) ) ,
1708+ promise: nil
1709+ )
16741710 XCTAssertEqual ( harness. flushedMessages. count, 3 )
16751711 self . assertChannelData (
16761712 harness. flushedMessage ( 1 ) ,
@@ -1887,9 +1923,12 @@ final class ChildChannelMultiplexerTests: XCTestCase {
18871923
18881924 let harness = self . harness { channel, type in
18891925 initializedChannels. append ( type)
1890-
1891- return channel. getOption ( SSHChildChannelOptions . sshChannelType) . map { type in
1926+ do {
1927+ let type = try channel. syncOptions! . getOption ( SSHChildChannelOptions . sshChannelType)
18921928 typesFromOptions. append ( type)
1929+ return channel. eventLoop. makeSucceededVoidFuture ( )
1930+ } catch {
1931+ return channel. eventLoop. makeFailedFuture ( error)
18931932 }
18941933 }
18951934
@@ -1930,7 +1969,9 @@ final class ChildChannelMultiplexerTests: XCTestCase {
19301969 let readCounter = ReadCountingHandler ( )
19311970
19321971 let harness = self . harness { channel, _ in
1933- channel. pipeline. addHandler ( readCounter)
1972+ channel. eventLoop. makeCompletedFuture {
1973+ try channel. pipeline. syncOperations. addHandler ( readCounter)
1974+ }
19341975 }
19351976 defer {
19361977 harness. finish ( )
@@ -1984,10 +2025,13 @@ final class ChildChannelMultiplexerTests: XCTestCase {
19842025 let readRecorder = ReadRecordingHandler ( )
19852026
19862027 let harness = self . harness { channel, _ in
1987- channel. setOption ( ChannelOptions . autoRead, value: true ) . flatMap {
1988- channel. setOption ( ChannelOptions . allowRemoteHalfClosure, value: true )
1989- } . flatMap {
1990- channel. pipeline. addHandlers ( [ readRecorder, eofHandler] )
2028+ channel. eventLoop. makeCompletedFuture {
2029+ let options = channel. syncOptions!
2030+ try options. setOption ( . autoRead, value: true )
2031+ try options. setOption ( . allowRemoteHalfClosure, value: true )
2032+ let sync = channel. pipeline. syncOperations
2033+ try sync. addHandler ( readRecorder)
2034+ try sync. addHandler ( eofHandler)
19912035 }
19922036 }
19932037 defer {
@@ -2020,10 +2064,13 @@ final class ChildChannelMultiplexerTests: XCTestCase {
20202064 let readRecorder = ReadRecordingHandler ( )
20212065
20222066 let harness = self . harness { channel, _ in
2023- channel. setOption ( ChannelOptions . autoRead, value: true ) . flatMap {
2024- channel. setOption ( ChannelOptions . allowRemoteHalfClosure, value: true )
2025- } . flatMap {
2026- channel. pipeline. addHandlers ( [ readRecorder, eofHandler] )
2067+ channel. eventLoop. makeCompletedFuture {
2068+ let options = channel. syncOptions!
2069+ try options. setOption ( . autoRead, value: true )
2070+ try options. setOption ( . allowRemoteHalfClosure, value: true )
2071+ let sync = channel. pipeline. syncOperations
2072+ try sync. addHandler ( readRecorder)
2073+ try sync. addHandler ( eofHandler)
20272074 }
20282075 }
20292076 defer {
0 commit comments