Skip to content

Commit 8049690

Browse files
[improve][broker] ServerCnx: go to Failed state when auth fails (#19312)
PIP: #12105 ### Motivation When authentication fails in the `ServerCnx`, the state is left in `Start` if the primary `authData` fails authentication and in `Connecting` or `Connected` if the `originalAuthData` authentication fails. To prevent any kind of unexpected behavior, we should go to `Failed` state. Note that the tests verify the current behavior where a failed `originalAuthData` results first in a `Connected` command from the broker and then an `Error` command. I documented that I think this is sub optimal here #19311. ### Modifications * Update `ServerCnx` state to `Failed` when there is an authentication exception during `handleConnect` and during `handleAuthResponse`. * Update `handleAuthResponse` reply to `"Unable to authenticate"` instead of the `AuthenticationState` exception. ### Verifying this change A new test is added. The added test covers the change made in #19295 where we updated `ServerCnx` so that we call `AuthState#authenticate` instead of relying on the implementation detail that the initialization calls `authenticate`. That PR should have added a test. ### Does this pull request potentially affect one of the following parts: This is not a breaking change. ### Documentation - [x] `doc-not-needed` ### Matching PR in forked repository PR in forked repository: michaeljmarshall#18
1 parent 689a33f commit 8049690

File tree

5 files changed

+213
-8
lines changed

5 files changed

+213
-8
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -969,10 +969,10 @@ protected void handleConnect(CommandConnect connect) {
969969
}
970970
} catch (Exception e) {
971971
service.getPulsarStats().recordConnectionCreateFail();
972+
state = State.Failed;
972973
logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
973-
String msg = "Unable to authenticate";
974-
writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
975-
close();
974+
ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Unable to authenticate");
975+
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
976976
}
977977
}
978978

@@ -994,15 +994,17 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
994994
authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY);
995995
} catch (AuthenticationException e) {
996996
service.getPulsarStats().recordConnectionCreateFail();
997+
state = State.Failed;
997998
log.warn("[{}] Authentication failed: {} ", remoteAddress, e.getMessage());
998-
writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, e.getMessage()));
999-
close();
999+
ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Unable to authenticate");
1000+
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
10001001
} catch (Exception e) {
10011002
service.getPulsarStats().recordConnectionCreateFail();
1003+
state = State.Failed;
10021004
String msg = "Unable to handleAuthResponse";
10031005
log.warn("[{}] {} ", remoteAddress, msg, e);
1004-
writeAndFlush(Commands.newError(-1, ServerError.UnknownError, msg));
1005-
close();
1006+
ByteBuf command = Commands.newError(-1, ServerError.UnknownError, msg);
1007+
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, command);
10061008
}
10071009
}
10081010

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.auth;
20+
21+
import javax.naming.AuthenticationException;
22+
import javax.net.ssl.SSLSession;
23+
import java.net.SocketAddress;
24+
import org.apache.pulsar.broker.authentication.AuthenticationState;
25+
import org.apache.pulsar.common.api.AuthData;
26+
27+
/**
28+
* Class that provides the same authentication semantics as the {@link MockAuthenticationProvider} except
29+
* that this one initializes the {@link MockMultiStageAuthenticationState} class to support testing
30+
* multistage authentication.
31+
*/
32+
public class MockMultiStageAuthenticationProvider extends MockAuthenticationProvider {
33+
34+
@Override
35+
public String getAuthMethodName() {
36+
return "multi-stage";
37+
}
38+
39+
@Override
40+
public AuthenticationState newAuthState(AuthData authData,
41+
SocketAddress remoteAddress,
42+
SSLSession sslSession) throws AuthenticationException {
43+
return new MockMultiStageAuthenticationState(this);
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.auth;
20+
21+
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
22+
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
23+
import org.apache.pulsar.broker.authentication.AuthenticationState;
24+
import org.apache.pulsar.common.api.AuthData;
25+
26+
import javax.naming.AuthenticationException;
27+
28+
import static java.nio.charset.StandardCharsets.UTF_8;
29+
30+
/**
31+
* Performs multistage authentication by extending the paradigm created in {@link MockAuthenticationProvider}.
32+
*/
33+
public class MockMultiStageAuthenticationState implements AuthenticationState {
34+
35+
private final MockMultiStageAuthenticationProvider provider;
36+
private String authRole = null;
37+
38+
MockMultiStageAuthenticationState(MockMultiStageAuthenticationProvider provider) {
39+
this.provider = provider;
40+
}
41+
42+
@Override
43+
public String getAuthRole() throws AuthenticationException {
44+
if (authRole == null) {
45+
throw new AuthenticationException("Must authenticate first");
46+
}
47+
return null;
48+
}
49+
50+
@Override
51+
public AuthData authenticate(AuthData authData) throws AuthenticationException {
52+
String data = new String(authData.getBytes(), UTF_8);
53+
String[] parts = data.split("\\.");
54+
if (parts.length == 2) {
55+
if ("challenge".equals(parts[0])) {
56+
return AuthData.of("challenged".getBytes());
57+
} else {
58+
AuthenticationDataCommand command = new AuthenticationDataCommand(data);
59+
authRole = provider.authenticate(command);
60+
// Auth successful, no more auth required
61+
return null;
62+
}
63+
}
64+
throw new AuthenticationException("Failed to authenticate");
65+
}
66+
67+
@Override
68+
public AuthenticationDataSource getAuthDataSource() {
69+
return null;
70+
}
71+
72+
@Override
73+
public boolean isComplete() {
74+
return authRole != null;
75+
}
76+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java

+77-1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@
8080
import org.apache.pulsar.broker.PulsarService;
8181
import org.apache.pulsar.broker.ServiceConfiguration;
8282
import org.apache.pulsar.broker.TransactionMetadataStoreService;
83+
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
84+
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
8385
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
8486
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
8587
import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -103,6 +105,7 @@
103105
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
104106
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
105107
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
108+
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
106109
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
107110
import org.apache.pulsar.common.api.proto.CommandConnected;
108111
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
@@ -450,11 +453,84 @@ public void testConnectCommandWithAuthenticationNegative() throws Exception {
450453
ByteBuf clientCommand = Commands.newConnect("none", "", null);
451454
channel.writeInbound(clientCommand);
452455

453-
assertEquals(serverCnx.getState(), State.Start);
456+
assertEquals(serverCnx.getState(), State.Failed);
454457
assertTrue(getResponse() instanceof CommandError);
455458
channel.finish();
456459
}
457460

461+
@Test(timeOut = 30000)
462+
public void testConnectCommandWithFailingOriginalAuthData() throws Exception {
463+
AuthenticationService authenticationService = mock(AuthenticationService.class);
464+
AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
465+
String authMethodName = authenticationProvider.getAuthMethodName();
466+
467+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
468+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
469+
svcConfig.setAuthenticationEnabled(true);
470+
svcConfig.setAuthenticateOriginalAuthData(true);
471+
svcConfig.setProxyRoles(Collections.singleton("proxy"));
472+
473+
resetChannel();
474+
assertTrue(channel.isActive());
475+
assertEquals(serverCnx.getState(), State.Start);
476+
477+
ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1,null,
478+
null, "client", "fail", authMethodName);
479+
channel.writeInbound(clientCommand);
480+
481+
// We currently expect two responses because the originalAuthData is verified after sending
482+
// a successful response to the proxy. Because this is a synchronous operation, there is currently
483+
// no risk. It would be better to fix this. See https://github.com/apache/pulsar/issues/19311.
484+
Object response1 = getResponse();
485+
assertTrue(response1 instanceof CommandConnected);
486+
Object response2 = getResponse();
487+
assertTrue(response2 instanceof CommandError);
488+
assertEquals(((CommandError) response2).getMessage(), "Unable to authenticate");
489+
assertEquals(serverCnx.getState(), State.Failed);
490+
assertFalse(serverCnx.isActive());
491+
channel.finish();
492+
}
493+
494+
@Test(timeOut = 30000)
495+
public void testAuthResponseWithFailingAuthData() throws Exception {
496+
AuthenticationService authenticationService = mock(AuthenticationService.class);
497+
AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider();
498+
String authMethodName = authenticationProvider.getAuthMethodName();
499+
500+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
501+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
502+
svcConfig.setAuthenticationEnabled(true);
503+
504+
resetChannel();
505+
assertTrue(channel.isActive());
506+
assertEquals(serverCnx.getState(), State.Start);
507+
508+
// Trigger connect command to result in AuthChallenge
509+
ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1");
510+
channel.writeInbound(clientCommand);
511+
512+
Object challenge1 = getResponse();
513+
assertTrue(challenge1 instanceof CommandAuthChallenge);
514+
515+
// Trigger another AuthChallenge to verify that code path continues to challenge
516+
ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()), 1, "1");
517+
channel.writeInbound(authResponse1);
518+
519+
Object challenge2 = getResponse();
520+
assertTrue(challenge2 instanceof CommandAuthChallenge);
521+
522+
// Trigger failure
523+
ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("fail.client".getBytes()), 1, "1");
524+
channel.writeInbound(authResponse2);
525+
526+
Object response3 = getResponse();
527+
assertTrue(response3 instanceof CommandError);
528+
assertEquals(((CommandError) response3).getMessage(), "Unable to authenticate");
529+
assertEquals(serverCnx.getState(), State.Failed);
530+
assertFalse(serverCnx.isActive());
531+
channel.finish();
532+
}
533+
458534
@Test(timeOut = 30000)
459535
public void testProducerCommand() throws Exception {
460536
resetChannel();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java

+6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Queue;
2222
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
2323
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
24+
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
2425
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
2526
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
2627
import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
@@ -83,6 +84,11 @@ protected void handleConnected(CommandConnected connected) {
8384
queue.offer(new CommandConnected().copyFrom(connected));
8485
}
8586

87+
@Override
88+
protected void handleAuthChallenge(CommandAuthChallenge challenge) {
89+
queue.offer(new CommandAuthChallenge().copyFrom(challenge));
90+
}
91+
8692
@Override
8793
protected void handleSubscribe(CommandSubscribe subscribe) {
8894
queue.offer(new CommandSubscribe().copyFrom(subscribe));

0 commit comments

Comments
 (0)