1313import org .opensearch .client .Response ;
1414import org .opensearch .client .StreamingRequest ;
1515import org .opensearch .client .StreamingResponse ;
16+ import org .opensearch .common .settings .Settings ;
1617import org .opensearch .test .rest .OpenSearchRestTestCase ;
1718import org .junit .After ;
1819
1920import java .io .IOException ;
2021import java .io .InterruptedIOException ;
2122import java .io .UncheckedIOException ;
23+ import java .lang .management .ManagementFactory ;
24+ import java .lang .management .ThreadMXBean ;
2225import java .nio .ByteBuffer ;
2326import java .nio .charset .StandardCharsets ;
2427import java .time .Duration ;
28+ import java .util .Arrays ;
29+ import java .util .Objects ;
30+ import java .util .concurrent .TimeUnit ;
2531import java .util .concurrent .atomic .AtomicInteger ;
32+ import java .util .function .Supplier ;
2633import java .util .stream .Stream ;
2734
2835import reactor .core .publisher .Flux ;
@@ -44,7 +51,11 @@ public void tearDown() throws Exception {
4451 super .tearDown ();
4552 }
4653
47- @ AwaitsFix (bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15840" )
54+ @ Override
55+ protected Settings restClientSettings () {
56+ return Settings .builder ().put (super .restClientSettings ()).put (CLIENT_SOCKET_TIMEOUT , "5s" ).build ();
57+ }
58+
4859 public void testCloseClientStreamingRequest () throws Exception {
4960 final VirtualTimeScheduler scheduler = VirtualTimeScheduler .create (true );
5061
@@ -67,11 +78,28 @@ public void testCloseClientStreamingRequest() throws Exception {
6778 final StreamingResponse <ByteBuffer > streamingResponse = client ().streamRequest (streamingRequest );
6879 scheduler .advanceTimeBy (delay ); /* emit first element */
6980
81+ final ThreadMXBean threadMXBean = ManagementFactory .getThreadMXBean ();
82+ final Supplier <Long > captureHttpClientThreads = () -> Arrays .stream (threadMXBean .getAllThreadIds ())
83+ .mapToObj (tid -> threadMXBean .getThreadInfo (tid ))
84+ .filter (Objects ::nonNull )
85+ .filter (t -> t .getThreadName ().startsWith ("httpclient-main-" ))
86+ .count ();
87+
7088 StepVerifier .create (Flux .from (streamingResponse .getBody ()).map (b -> new String (b .array (), StandardCharsets .UTF_8 )))
7189 .expectNextMatches (s -> s .contains ("\" result\" :\" created\" " ) && s .contains ("\" _id\" :\" 1\" " ))
7290 .then (() -> {
7391 try {
7492 client ().close ();
93+
94+ // Await for the non-admin client to fully shutdown
95+ final long await = TimeUnit .SECONDS .toNanos (5 );
96+ final long started = System .nanoTime ();
97+ while (captureHttpClientThreads .get () > 1 ) {
98+ Thread .onSpinWait ();
99+ if ((System .nanoTime () - started ) > await ) {
100+ throw new InterruptedIOException ("The client is still shutting down" );
101+ }
102+ }
75103 } catch (final IOException ex ) {
76104 throw new UncheckedIOException (ex );
77105 }
0 commit comments