2424import java .nio .ByteBuffer ;
2525import java .time .Duration ;
2626import java .util .ArrayList ;
27+ import java .util .Arrays ;
2728import java .util .List ;
2829import java .util .concurrent .CountDownLatch ;
2930import java .util .concurrent .ExecutorService ;
@@ -88,6 +89,20 @@ public class EndOfFragmentIntegTest extends ProducerTestBase {
8889 private static final int NUMBER_OF_FRAMES_TO_STREAM = 10 ;
8990 private static final int FPS = 5 ;
9091 private static final int KEYFRAME_INTERVAL = 5 ;
92+ private static final int SHUTDOWN_TIMEOUT_MS = 5000 ; //total time allowed for clean up
93+ private static final int INTERVAL_MS = 100 ; //time interval between each clean up
94+
95+ /**
96+ * Names of the threads that exist before the test.
97+ */
98+ private List <String > threadsBefore ;
99+
100+ /**
101+ * List of thread names to ignore.
102+ */
103+ private List <String > threadsToIgnore = Arrays .asList (
104+ "java-sdk-http-connection-reaper" // Owned by AWS SDK Java
105+ );
91106
92107 /**
93108 * List of streams created during tests that need to be cleaned up.
@@ -113,6 +128,14 @@ public void setUp() {
113128 assumeTrue ("You need to increase the number of streams the client is configured for before starting the test!" ,
114129 NUMBER_OF_STREAMS_PER_ITERATION <= NUMBER_OF_STREAMS );
115130
131+ // Capture baseline thread state
132+ this .threadsBefore = Thread .getAllStackTraces ().keySet ()
133+ .stream ()
134+ .map (Thread ::getName )
135+ .collect (Collectors .toList ());
136+
137+ threadsBefore .sort (String .CASE_INSENSITIVE_ORDER );
138+
116139 createProducer ();
117140 }
118141
@@ -150,6 +173,44 @@ public void tearDown() {
150173 this .createdStreams .clear ();
151174
152175 assertFalse ("An exception happened during cleanup!" , failure );
176+
177+ freeProducer ();
178+ threadsBefore .removeAll (threadsToIgnore );
179+ for (int i = 0 ; i < SHUTDOWN_TIMEOUT_MS ; i += INTERVAL_MS ) {
180+ final List <String > threadsNow = Thread .getAllStackTraces ().keySet ()
181+ .stream ()
182+ .map (Thread ::getName )
183+ .collect (Collectors .toList ());
184+
185+ threadsNow .sort (String .CASE_INSENSITIVE_ORDER );
186+ threadsNow .removeAll (threadsToIgnore );
187+ log .info ("Cleanup iteration {}/() ms - Current thread count: {}, Expected: {}" ,
188+ i , SHUTDOWN_TIMEOUT_MS , threadsNow .size (), this .threadsBefore .size ());
189+
190+ if (threadsNow .equals (this .threadsBefore )) {
191+ break ; // threads are cleaned up
192+ } else {
193+ //if threads are not clearned up yet
194+ List <String > extraThreads = new ArrayList <>(threadsNow );
195+ extraThreads .removeAll (this .threadsBefore );
196+ if (!extraThreads .isEmpty ()) {
197+ log .warn ("extra threads are still running: {}" , extraThreads );
198+ }
199+ }
200+
201+ if (i + INTERVAL_MS >= SHUTDOWN_TIMEOUT_MS ) {
202+ //time has exceeded shutdown timeout
203+ log .error ("Expected threads: {}" , this .threadsBefore );
204+ log .error ("Current threads: {}" , threadsNow );
205+ fail ("Timeout waiting for threads to be cleaned up properly" );
206+ }
207+
208+ try {
209+ Thread .sleep (INTERVAL_MS );
210+ } catch (InterruptedException e ) {
211+ Thread .currentThread ().interrupt ();
212+ }
213+ }
153214 }
154215
155216 // Using this as a way to repeat the test multiple times
@@ -278,6 +339,8 @@ public void test_When_StoppingNearIntermittentProducer_Then_StreamingSuccessfull
278339 .count ();
279340
280341 assertTrue ("Didn't receive any PERSISTED ACKs. Received: " + this .receivedFragmentAcks_ , persistedAcksCount > 0 );
342+
343+ executorService .shutdownNow ();
281344 }
282345
283346 /**
0 commit comments