Skip to content

Commit 3ef3bf1

Browse files
[improve][broker] ServerCnx: go to Failed state when auth fails (apache#19312)
PIP: apache#12105 When authentication fails in the `ServerCnx`, the state is left in `Start` if the primary `authData` fails authentication and in `Connecting` or `Connected` if the `originalAuthData` authentication fails. To prevent any kind of unexpected behavior, we should go to `Failed` state. Note that the tests verify the current behavior where a failed `originalAuthData` results first in a `Connected` command from the broker and then an `Error` command. I documented that I think this is sub optimal here apache#19311. * Update `ServerCnx` state to `Failed` when there is an authentication exception during `handleConnect` and during `handleAuthResponse`. * Update `handleAuthResponse` reply to `"Unable to authenticate"` instead of the `AuthenticationState` exception. A new test is added. The added test covers the change made in apache#19295 where we updated `ServerCnx` so that we call `AuthState#authenticate` instead of relying on the implementation detail that the initialization calls `authenticate`. That PR should have added a test. This is not a breaking change. - [x] `doc-not-needed` PR in forked repository: #18 (cherry picked from commit 8049690)
1 parent 8246da2 commit 3ef3bf1

File tree

5 files changed

+206
-1
lines changed

5 files changed

+206
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+3
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,7 @@ protected void handleConnect(CommandConnect connect) {
915915
doAuthentication(clientData, false, clientProtocolVersion, clientVersion);
916916
} catch (Exception e) {
917917
service.getPulsarStats().recordConnectionCreateFail();
918+
state = State.Failed;
918919
logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
919920
String msg = "Unable to authenticate";
920921
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
@@ -940,11 +941,13 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
940941
authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY);
941942
} catch (AuthenticationException e) {
942943
service.getPulsarStats().recordConnectionCreateFail();
944+
state = State.Failed;
943945
log.warn("[{}] Authentication failed: {} ", remoteAddress, e.getMessage());
944946
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, e.getMessage()));
945947
close();
946948
} catch (Exception e) {
947949
service.getPulsarStats().recordConnectionCreateFail();
950+
state = State.Failed;
948951
String msg = "Unable to handleAuthResponse";
949952
log.warn("[{}] {} ", remoteAddress, msg, e);
950953
ctx.writeAndFlush(Commands.newError(-1, ServerError.UnknownError, msg));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.auth;
20+
21+
import javax.naming.AuthenticationException;
22+
import javax.net.ssl.SSLSession;
23+
import java.net.SocketAddress;
24+
import org.apache.pulsar.broker.authentication.AuthenticationState;
25+
import org.apache.pulsar.common.api.AuthData;
26+
27+
/**
28+
* Class that provides the same authentication semantics as the {@link MockAuthenticationProvider} except
29+
* that this one initializes the {@link MockMultiStageAuthenticationState} class to support testing
30+
* multistage authentication.
31+
*/
32+
public class MockMultiStageAuthenticationProvider extends MockAuthenticationProvider {
33+
34+
@Override
35+
public String getAuthMethodName() {
36+
return "multi-stage";
37+
}
38+
39+
@Override
40+
public AuthenticationState newAuthState(AuthData authData,
41+
SocketAddress remoteAddress,
42+
SSLSession sslSession) throws AuthenticationException {
43+
return new MockMultiStageAuthenticationState(this);
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.auth;
20+
21+
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
22+
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
23+
import org.apache.pulsar.broker.authentication.AuthenticationState;
24+
import org.apache.pulsar.common.api.AuthData;
25+
26+
import javax.naming.AuthenticationException;
27+
28+
import static java.nio.charset.StandardCharsets.UTF_8;
29+
30+
/**
31+
* Performs multistage authentication by extending the paradigm created in {@link MockAuthenticationProvider}.
32+
*/
33+
public class MockMultiStageAuthenticationState implements AuthenticationState {
34+
35+
private final MockMultiStageAuthenticationProvider provider;
36+
private String authRole = null;
37+
38+
MockMultiStageAuthenticationState(MockMultiStageAuthenticationProvider provider) {
39+
this.provider = provider;
40+
}
41+
42+
@Override
43+
public String getAuthRole() throws AuthenticationException {
44+
if (authRole == null) {
45+
throw new AuthenticationException("Must authenticate first");
46+
}
47+
return null;
48+
}
49+
50+
@Override
51+
public AuthData authenticate(AuthData authData) throws AuthenticationException {
52+
String data = new String(authData.getBytes(), UTF_8);
53+
String[] parts = data.split("\\.");
54+
if (parts.length == 2) {
55+
if ("challenge".equals(parts[0])) {
56+
return AuthData.of("challenged".getBytes());
57+
} else {
58+
AuthenticationDataCommand command = new AuthenticationDataCommand(data);
59+
authRole = provider.authenticate(command);
60+
// Auth successful, no more auth required
61+
return null;
62+
}
63+
}
64+
throw new AuthenticationException("Failed to authenticate");
65+
}
66+
67+
@Override
68+
public AuthenticationDataSource getAuthDataSource() {
69+
return null;
70+
}
71+
72+
@Override
73+
public boolean isComplete() {
74+
return authRole != null;
75+
}
76+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java

+76-1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.apache.pulsar.broker.ServiceConfiguration;
8181
import org.apache.pulsar.broker.TransactionMetadataStoreService;
8282
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
83+
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
8384
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
8485
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
8586
import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -103,6 +104,7 @@
103104
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
104105
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
105106
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
107+
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
106108
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
107109
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
108110
import org.apache.pulsar.common.api.proto.CommandConnected;
@@ -490,11 +492,84 @@ public void testConnectCommandWithAuthenticationNegative() throws Exception {
490492
ByteBuf clientCommand = Commands.newConnect("none", "", null);
491493
channel.writeInbound(clientCommand);
492494

493-
assertEquals(serverCnx.getState(), State.Start);
495+
assertEquals(serverCnx.getState(), State.Failed);
494496
assertTrue(getResponse() instanceof CommandError);
495497
channel.finish();
496498
}
497499

500+
@Test(timeOut = 30000)
501+
public void testConnectCommandWithFailingOriginalAuthData() throws Exception {
502+
AuthenticationService authenticationService = mock(AuthenticationService.class);
503+
AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
504+
String authMethodName = authenticationProvider.getAuthMethodName();
505+
506+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
507+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
508+
svcConfig.setAuthenticationEnabled(true);
509+
svcConfig.setAuthenticateOriginalAuthData(true);
510+
svcConfig.setProxyRoles(Collections.singleton("proxy"));
511+
512+
resetChannel();
513+
assertTrue(channel.isActive());
514+
assertEquals(serverCnx.getState(), State.Start);
515+
516+
ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1,null,
517+
null, "client", "fail", authMethodName);
518+
channel.writeInbound(clientCommand);
519+
520+
// We currently expect two responses because the originalAuthData is verified after sending
521+
// a successful response to the proxy. Because this is a synchronous operation, there is currently
522+
// no risk. It would be better to fix this. See https://github.com/apache/pulsar/issues/19311.
523+
Object response1 = getResponse();
524+
assertTrue(response1 instanceof CommandConnected);
525+
Object response2 = getResponse();
526+
assertTrue(response2 instanceof CommandError);
527+
assertEquals(((CommandError) response2).getMessage(), "Unable to authenticate");
528+
assertEquals(serverCnx.getState(), State.Failed);
529+
assertFalse(serverCnx.isActive());
530+
channel.finish();
531+
}
532+
533+
@Test(timeOut = 30000)
534+
public void testAuthResponseWithFailingAuthData() throws Exception {
535+
AuthenticationService authenticationService = mock(AuthenticationService.class);
536+
AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider();
537+
String authMethodName = authenticationProvider.getAuthMethodName();
538+
539+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
540+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
541+
svcConfig.setAuthenticationEnabled(true);
542+
543+
resetChannel();
544+
assertTrue(channel.isActive());
545+
assertEquals(serverCnx.getState(), State.Start);
546+
547+
// Trigger connect command to result in AuthChallenge
548+
ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1");
549+
channel.writeInbound(clientCommand);
550+
551+
Object challenge1 = getResponse();
552+
assertTrue(challenge1 instanceof CommandAuthChallenge);
553+
554+
// Trigger another AuthChallenge to verify that code path continues to challenge
555+
ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()), 1, "1");
556+
channel.writeInbound(authResponse1);
557+
558+
Object challenge2 = getResponse();
559+
assertTrue(challenge2 instanceof CommandAuthChallenge);
560+
561+
// Trigger failure
562+
ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("fail.client".getBytes()), 1, "1");
563+
channel.writeInbound(authResponse2);
564+
565+
Object response3 = getResponse();
566+
assertTrue(response3 instanceof CommandError);
567+
assertEquals(((CommandError) response3).getMessage(), "Unable to authenticate");
568+
assertEquals(serverCnx.getState(), State.Failed);
569+
assertFalse(serverCnx.isActive());
570+
channel.finish();
571+
}
572+
498573
@Test(timeOut = 30000)
499574
public void testProducerCommand() throws Exception {
500575
resetChannel();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java

+6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Queue;
2222
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
2323
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
24+
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
2425
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
2526
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
2627
import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
@@ -83,6 +84,11 @@ protected void handleConnected(CommandConnected connected) {
8384
queue.offer(new CommandConnected().copyFrom(connected));
8485
}
8586

87+
@Override
88+
protected void handleAuthChallenge(CommandAuthChallenge challenge) {
89+
queue.offer(new CommandAuthChallenge().copyFrom(challenge));
90+
}
91+
8692
@Override
8793
protected void handleSubscribe(CommandSubscribe subscribe) {
8894
queue.offer(new CommandSubscribe().copyFrom(subscribe));

0 commit comments

Comments
 (0)