33import com .amazonaws .kinesisvideo .client .IPVersionFilter ;
44import com .amazonaws .kinesisvideo .client .KinesisVideoClientConfigurationDefaults ;
55import com .amazonaws .kinesisvideo .common .function .Consumer ;
6+ import com .amazonaws .kinesisvideo .common .preconditions .Preconditions ;
67import com .amazonaws .kinesisvideo .socket .SocketFactory ;
78import com .amazonaws .kinesisvideo .util .LoggedExitRunnable ;
89import com .google .common .util .concurrent .ThreadFactoryBuilder ;
910import org .apache .logging .log4j .LogManager ;
1011import org .apache .logging .log4j .Logger ;
1112
13+ import javax .annotation .Nonnull ;
14+ import javax .annotation .Nullable ;
1215import java .io .BufferedWriter ;
1316import java .io .IOException ;
1417import java .io .InputStream ;
1821import java .net .Socket ;
1922import java .net .URI ;
2023import java .nio .charset .Charset ;
24+ import java .util .ArrayList ;
2125import java .util .HashMap ;
26+ import java .util .List ;
2227import java .util .Map ;
2328import java .util .UUID ;
2429import java .util .concurrent .ExecutorService ;
2530import java .util .concurrent .Executors ;
31+ import java .util .concurrent .TimeUnit ;
2632
2733import static com .amazonaws .kinesisvideo .common .preconditions .Preconditions .checkNotNull ;
2834
2935public final class ParallelSimpleHttpClient implements HttpClient {
36+
37+ private static final int AWAIT_THREAD_TERMINATE_SECS = 3 ;
38+
3039 private static final String SPACE = " " ;
3140 private static final String CLRF = "\r \n " ;
3241 private static final String HTTP_1_1 = "HTTP/1.1" ;
@@ -51,6 +60,34 @@ public void accept(final Exception object) {
5160 private OutputStream mOutputStream ;
5261 private ExecutorService payloadSender ;
5362 private ExecutorService responseReceiver ;
63+ private final List <ExitResult > exitHistory = new ArrayList <>();
64+
65+ private enum Caller {
66+ SENDER ,
67+ RECEIVER ,
68+ CLOSE
69+ }
70+
71+ private static class ExitResult {
72+ @ Nonnull
73+ private Caller caller ;
74+
75+ @ Nullable
76+ private Exception exception ;
77+
78+ ExitResult (@ Nonnull final Caller caller , @ Nullable final Exception exception ) {
79+ this .caller = caller ;
80+ this .exception = exception ;
81+ }
82+
83+ @ Override
84+ public String toString () {
85+ return "ExitResult{" +
86+ "caller=" + caller +
87+ ", exception=" + exception +
88+ '}' ;
89+ }
90+ }
5491
5592 private ParallelSimpleHttpClient (final Builder builder ) {
5693 mBuilder = builder ;
@@ -157,10 +194,7 @@ public void execute() {
157194 log .error ("[{}] Exception thrown on sending thread" , mBuilder .mStreamName , e );
158195 storedException = e ;
159196 } finally {
160- //Only call completion if there is an exception, otherwise sender will call completion
161- if (storedException != null ) {
162- mBuilder .mCompletion .accept (storedException );
163- }
197+ notifyCompletionCallback (new ExitResult (Caller .SENDER , storedException ));
164198 payloadSender .shutdownNow ();
165199 }
166200 }
@@ -184,7 +218,7 @@ public void execute() {
184218 log .error ("[{}] Exception thrown on receiving thread" , mBuilder .mStreamName , e );
185219 storedException = e ;
186220 } finally {
187- mBuilder . mCompletion . accept ( storedException );
221+ notifyCompletionCallback ( new ExitResult ( Caller . RECEIVER , storedException ) );
188222 responseReceiver .shutdownNow ();
189223 closeSocket ();
190224 }
@@ -245,7 +279,83 @@ public void close() throws IOException {
245279 payloadSender .shutdownNow ();
246280 responseReceiver .shutdownNow ();
247281 closeSocket ();
248- mBuilder .mCompletion .accept (null );
282+
283+ awaitTryShutdownThreads ();
284+
285+ notifyCompletionCallback (new ExitResult (Caller .CLOSE , null ));
286+ }
287+
288+ // This is used to synchronize the 3 threads which call the completion callback:
289+ // - Sender thread
290+ // - Receiving ACKs thread
291+ // - Thread calling close()
292+ // If close() is called, it will immediately invoke the completion callback with success.
293+ // Otherwise, it will wait for both sender and receiver threads to exit before notifying.
294+ // If applicable, the thread that threw the exception first's result will be propagated.
295+ private void notifyCompletionCallback (@ Nonnull final ExitResult exitResult ) {
296+ // Note: the thread name should already have the stream name + connection handle # in it
297+ log .debug ("Received: {}" , exitResult );
298+
299+ if (mBuilder .mCompletion != null ) {
300+
301+ if (exitResult .caller == Caller .CLOSE ) {
302+ mBuilder .mCompletion .accept (null );
303+ return ;
304+ }
305+
306+ Exception exceptionToNotify = null ;
307+ boolean notify = false ;
308+ synchronized (this .exitHistory ) {
309+ this .exitHistory .add (exitResult );
310+
311+ if (this .exitHistory .size () == 2 &&
312+ ((this .exitHistory .get (0 ).caller == Caller .SENDER && this .exitHistory .get (1 ).caller == Caller .RECEIVER ) ||
313+ (this .exitHistory .get (0 ).caller == Caller .RECEIVER && this .exitHistory .get (1 ).caller == Caller .SENDER ))
314+ ) {
315+ // Check if either one of them exited with an exception
316+ // If so, propagate it. If both of them terminated normally, notify with null
317+ notify = true ;
318+
319+ // prioritize the exception that came first
320+ exceptionToNotify = this .exitHistory .get (0 ).exception ;
321+ if (exceptionToNotify == null ) {
322+ exceptionToNotify = this .exitHistory .get (1 ).exception ;
323+ }
324+ } else {
325+ log .debug ("Not notifying this time, caller history: {}" , this .exitHistory );
326+ }
327+ }
328+
329+ if (notify ) {
330+ log .debug ("[{}] notifying completion callback with {}" , mBuilder .mStreamName , exceptionToNotify );
331+ mBuilder .mCompletion .accept (exceptionToNotify );
332+ }
333+ }
334+ }
335+
336+ // Wait for the threads to terminate
337+ // If the threads are not alive, returns immediately
338+ // Expecting these to be near instantaneous
339+ private void awaitTryShutdownThreads () {
340+ awaitTermination (this .payloadSender , "payload sender" , AWAIT_THREAD_TERMINATE_SECS );
341+ awaitTermination (this .responseReceiver , "response receiver" , AWAIT_THREAD_TERMINATE_SECS );
342+ }
343+
344+ @ SuppressWarnings ("ConstantConditions" )
345+ private void awaitTermination (@ Nonnull final ExecutorService executor , @ Nonnull final String id ,
346+ final int threadTerminateTimeoutSeconds ) {
347+ Preconditions .checkArgument (executor != null , "Executor cannot be null" );
348+ Preconditions .checkArgument (id != null , "ID cannot be null" );
349+ Preconditions .checkArgument (threadTerminateTimeoutSeconds >= 0 , "ThreadTerminateTimeoutSeconds must be positive" );
350+
351+ try {
352+ if (!executor .awaitTermination (AWAIT_THREAD_TERMINATE_SECS , TimeUnit .SECONDS )) {
353+ log .error ("{}: {} couldn't shutdown within {} seconds" , mBuilder .mStreamName , id , AWAIT_THREAD_TERMINATE_SECS );
354+ }
355+ } catch (final InterruptedException e ) {
356+ log .error ("{}: Interrupted while waiting for {} shutdown" , mBuilder .mStreamName , id , e );
357+ Thread .currentThread ().interrupt ();
358+ }
249359 }
250360
251361
0 commit comments