|
80 | 80 | import org.apache.pulsar.broker.PulsarService;
|
81 | 81 | import org.apache.pulsar.broker.ServiceConfiguration;
|
82 | 82 | import org.apache.pulsar.broker.TransactionMetadataStoreService;
|
| 83 | +import org.apache.pulsar.broker.auth.MockAuthenticationProvider; |
| 84 | +import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider; |
83 | 85 | import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
|
84 | 86 | import org.apache.pulsar.broker.authentication.AuthenticationProvider;
|
85 | 87 | import org.apache.pulsar.broker.authentication.AuthenticationService;
|
|
103 | 105 | import org.apache.pulsar.common.api.proto.CommandAck.AckType;
|
104 | 106 | import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
|
105 | 107 | import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
|
| 108 | +import org.apache.pulsar.common.api.proto.CommandAuthChallenge; |
106 | 109 | import org.apache.pulsar.common.api.proto.CommandAuthResponse;
|
107 | 110 | import org.apache.pulsar.common.api.proto.CommandConnected;
|
108 | 111 | import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
|
@@ -450,11 +453,84 @@ public void testConnectCommandWithAuthenticationNegative() throws Exception {
|
450 | 453 | ByteBuf clientCommand = Commands.newConnect("none", "", null);
|
451 | 454 | channel.writeInbound(clientCommand);
|
452 | 455 |
|
453 |
| - assertEquals(serverCnx.getState(), State.Start); |
| 456 | + assertEquals(serverCnx.getState(), State.Failed); |
454 | 457 | assertTrue(getResponse() instanceof CommandError);
|
455 | 458 | channel.finish();
|
456 | 459 | }
|
457 | 460 |
|
| 461 | + @Test(timeOut = 30000) |
| 462 | + public void testConnectCommandWithFailingOriginalAuthData() throws Exception { |
| 463 | + AuthenticationService authenticationService = mock(AuthenticationService.class); |
| 464 | + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); |
| 465 | + String authMethodName = authenticationProvider.getAuthMethodName(); |
| 466 | + |
| 467 | + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| 468 | + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| 469 | + svcConfig.setAuthenticationEnabled(true); |
| 470 | + svcConfig.setAuthenticateOriginalAuthData(true); |
| 471 | + svcConfig.setProxyRoles(Collections.singleton("proxy")); |
| 472 | + |
| 473 | + resetChannel(); |
| 474 | + assertTrue(channel.isActive()); |
| 475 | + assertEquals(serverCnx.getState(), State.Start); |
| 476 | + |
| 477 | + ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1,null, |
| 478 | + null, "client", "fail", authMethodName); |
| 479 | + channel.writeInbound(clientCommand); |
| 480 | + |
| 481 | + // We currently expect two responses because the originalAuthData is verified after sending |
| 482 | + // a successful response to the proxy. Because this is a synchronous operation, there is currently |
| 483 | + // no risk. It would be better to fix this. See https://github.com/apache/pulsar/issues/19311. |
| 484 | + Object response1 = getResponse(); |
| 485 | + assertTrue(response1 instanceof CommandConnected); |
| 486 | + Object response2 = getResponse(); |
| 487 | + assertTrue(response2 instanceof CommandError); |
| 488 | + assertEquals(((CommandError) response2).getMessage(), "Unable to authenticate"); |
| 489 | + assertEquals(serverCnx.getState(), State.Failed); |
| 490 | + assertFalse(serverCnx.isActive()); |
| 491 | + channel.finish(); |
| 492 | + } |
| 493 | + |
| 494 | + @Test(timeOut = 30000) |
| 495 | + public void testAuthResponseWithFailingAuthData() throws Exception { |
| 496 | + AuthenticationService authenticationService = mock(AuthenticationService.class); |
| 497 | + AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider(); |
| 498 | + String authMethodName = authenticationProvider.getAuthMethodName(); |
| 499 | + |
| 500 | + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| 501 | + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| 502 | + svcConfig.setAuthenticationEnabled(true); |
| 503 | + |
| 504 | + resetChannel(); |
| 505 | + assertTrue(channel.isActive()); |
| 506 | + assertEquals(serverCnx.getState(), State.Start); |
| 507 | + |
| 508 | + // Trigger connect command to result in AuthChallenge |
| 509 | + ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1"); |
| 510 | + channel.writeInbound(clientCommand); |
| 511 | + |
| 512 | + Object challenge1 = getResponse(); |
| 513 | + assertTrue(challenge1 instanceof CommandAuthChallenge); |
| 514 | + |
| 515 | + // Trigger another AuthChallenge to verify that code path continues to challenge |
| 516 | + ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()), 1, "1"); |
| 517 | + channel.writeInbound(authResponse1); |
| 518 | + |
| 519 | + Object challenge2 = getResponse(); |
| 520 | + assertTrue(challenge2 instanceof CommandAuthChallenge); |
| 521 | + |
| 522 | + // Trigger failure |
| 523 | + ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("fail.client".getBytes()), 1, "1"); |
| 524 | + channel.writeInbound(authResponse2); |
| 525 | + |
| 526 | + Object response3 = getResponse(); |
| 527 | + assertTrue(response3 instanceof CommandError); |
| 528 | + assertEquals(((CommandError) response3).getMessage(), "Unable to authenticate"); |
| 529 | + assertEquals(serverCnx.getState(), State.Failed); |
| 530 | + assertFalse(serverCnx.isActive()); |
| 531 | + channel.finish(); |
| 532 | + } |
| 533 | + |
458 | 534 | @Test(timeOut = 30000)
|
459 | 535 | public void testProducerCommand() throws Exception {
|
460 | 536 | resetChannel();
|
|
0 commit comments