1919import static com .rabbitmq .stream .oauth2 .TokenCredentialsManager .DEFAULT_REFRESH_DELAY_STRATEGY ;
2020import static java .time .Duration .ofMillis ;
2121import static java .time .Duration .ofSeconds ;
22+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
2223import static java .util .stream .Collectors .toList ;
2324import static java .util .stream .IntStream .range ;
2425import static org .assertj .core .api .Assertions .assertThat ;
2526import static org .mockito .Mockito .when ;
2627
28+ import com .rabbitmq .stream .oauth2 .CredentialsManager .Registration ;
29+ import com .rabbitmq .stream .oauth2 .OAuth2TestUtils .Pair ;
2730import java .time .Duration ;
2831import java .time .Instant ;
2932import java .util .List ;
3033import java .util .concurrent .CountDownLatch ;
3134import java .util .concurrent .Executors ;
3235import java .util .concurrent .ScheduledExecutorService ;
33- import java .util .concurrent .TimeUnit ;
3436import java .util .concurrent .atomic .AtomicInteger ;
3537import java .util .function .Function ;
3638import org .junit .jupiter .api .AfterEach ;
@@ -72,22 +74,17 @@ void refreshShouldStopOnceUnregistered() throws InterruptedException {
7274 this .requester , this .scheduledExecutorService , DEFAULT_REFRESH_DELAY_STRATEGY );
7375 int expectedRefreshCount = 3 ;
7476 AtomicInteger refreshCount = new AtomicInteger ();
75- CountDownLatch refreshSync = new CountDownLatch (expectedRefreshCount );
76- CredentialsManager . Registration registration =
77+ CountDownLatch refreshLatch = new CountDownLatch (expectedRefreshCount );
78+ Registration registration =
7779 credentials .register (
7880 "" ,
7981 (u , p ) -> {
8082 refreshCount .incrementAndGet ();
81- refreshSync .countDown ();
83+ refreshLatch .countDown ();
8284 });
8385 registration .connect (connectionCallback (() -> {}));
8486 assertThat (requestCount ).hasValue (1 );
85- try {
86- assertThat (refreshSync .await (ofSeconds (10 ).toMillis (), TimeUnit .MILLISECONDS )).isTrue ();
87- } catch (InterruptedException e ) {
88- Thread .currentThread ().interrupt ();
89- throw new RuntimeException (e );
90- }
87+ assertThat (refreshLatch .await (ofSeconds (10 ).toMillis (), MILLISECONDS )).isTrue ();
9188 assertThat (requestCount ).hasValue (expectedRefreshCount + 1 );
9289 registration .close ();
9390 assertThat (refreshCount ).hasValue (expectedRefreshCount );
@@ -110,12 +107,12 @@ void severalRegistrationsShouldBeRefreshed() throws Exception {
110107 int expectedRefreshCountPerConnection = 3 ;
111108 int connectionCount = 10 ;
112109 AtomicInteger totalRefreshCount = new AtomicInteger ();
113- List <OAuth2TestUtils . Pair <CredentialsManager . Registration , CountDownLatch >> registrations =
110+ List <Pair <Registration , CountDownLatch >> registrations =
114111 range (0 , connectionCount )
115112 .mapToObj (
116113 ignored -> {
117114 CountDownLatch sync = new CountDownLatch (expectedRefreshCountPerConnection );
118- CredentialsManager . Registration r =
115+ Registration r =
119116 credentials .register (
120117 "" ,
121118 (username , password ) -> {
@@ -127,15 +124,8 @@ void severalRegistrationsShouldBeRefreshed() throws Exception {
127124 .collect (toList ());
128125
129126 registrations .forEach (r -> r .v1 ().connect (connectionCallback (() -> {})));
130- for (OAuth2TestUtils .Pair <CredentialsManager .Registration , CountDownLatch > registrationPair :
131- registrations ) {
132- try {
133- assertThat (registrationPair .v2 ().await (ofSeconds (10 ).toMillis (), TimeUnit .MILLISECONDS ))
134- .isTrue ();
135- } catch (InterruptedException e ) {
136- Thread .currentThread ().interrupt ();
137- throw new RuntimeException (e );
138- }
127+ for (Pair <Registration , CountDownLatch > registrationPair : registrations ) {
128+ assertThat (registrationPair .v2 ().await (ofSeconds (10 ).toMillis (), MILLISECONDS )).isTrue ();
139129 }
140130 // all connections have been refreshed once
141131 int refreshCountSnapshot = totalRefreshCount .get ();
0 commit comments