Skip to content

Commit a823e02

Browse files
committed
[fix][authentication] Store the original authentication data (apache#19519)
Signed-off-by: Zixuan Liu <nodeces@gmail.com> (cherry picked from commit 2d90089) Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 240c22c commit a823e02

File tree

5 files changed

+171
-52
lines changed

5 files changed

+171
-52
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+14-11
Original file line numberDiff line numberDiff line change
@@ -643,20 +643,12 @@ private void doAuthentication(AuthData clientData,
643643
// 2. an authentication refresh, in which case we need to refresh authenticationData
644644

645645
String newAuthRole = authState.getAuthRole();
646+
AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource();
646647

647-
// Refresh the auth data.
648-
this.authenticationData = authState.getAuthDataSource();
649-
if (log.isDebugEnabled()) {
650-
log.debug("[{}] Auth data refreshed for role={}", remoteAddress, this.authRole);
651-
}
652-
648+
// Set the auth data and auth role
653649
if (!useOriginalAuthState) {
654650
this.authRole = newAuthRole;
655-
}
656-
657-
if (log.isDebugEnabled()) {
658-
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
659-
remoteAddress, authMethod, this.authRole, originalPrincipal);
651+
this.authenticationData = newAuthDataSource;
660652
}
661653

662654
if (state != State.Connected) {
@@ -676,7 +668,18 @@ private void doAuthentication(AuthData clientData,
676668
maybeScheduleAuthenticationCredentialsRefresh();
677669
}
678670
completeConnect(clientProtocolVersion, clientVersion);
671+
if (log.isDebugEnabled()) {
672+
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
673+
remoteAddress, authMethod, this.authRole, originalPrincipal);
674+
}
679675
} else {
676+
// Refresh the auth data
677+
if (!useOriginalAuthState) {
678+
this.authenticationData = newAuthDataSource;
679+
} else {
680+
this.originalAuthData = newAuthDataSource;
681+
}
682+
680683
// If the connection was already ready, it means we're doing a refresh
681684
if (!StringUtils.isEmpty(authRole)) {
682685
if (!authRole.equals(newAuthRole)) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java

+4-41
Original file line numberDiff line numberDiff line change
@@ -18,52 +18,15 @@
1818
*/
1919
package org.apache.pulsar.broker.auth;
2020

21-
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
22-
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
23-
import org.apache.pulsar.broker.authentication.AuthenticationState;
24-
import org.apache.pulsar.common.api.AuthData;
25-
26-
import javax.naming.AuthenticationException;
27-
28-
import static java.nio.charset.StandardCharsets.UTF_8;
21+
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
2922

3023
/**
3124
* Class to use when verifying the behavior around expired authentication data because it will always return
3225
* true when isExpired is called.
3326
*/
34-
public class MockAlwaysExpiredAuthenticationState implements AuthenticationState {
35-
final MockAlwaysExpiredAuthenticationProvider provider;
36-
AuthenticationDataSource authenticationDataSource;
37-
volatile String authRole;
38-
39-
MockAlwaysExpiredAuthenticationState(MockAlwaysExpiredAuthenticationProvider provider) {
40-
this.provider = provider;
41-
}
42-
43-
44-
@Override
45-
public String getAuthRole() throws AuthenticationException {
46-
if (authRole == null) {
47-
throw new AuthenticationException("Must authenticate first.");
48-
}
49-
return authRole;
50-
}
51-
52-
@Override
53-
public AuthData authenticate(AuthData authData) throws AuthenticationException {
54-
authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8));
55-
authRole = provider.authenticate(authenticationDataSource);
56-
return null;
57-
}
58-
59-
@Override
60-
public AuthenticationDataSource getAuthDataSource() {
61-
return authenticationDataSource;
62-
}
63-
64-
@Override
65-
public boolean isComplete() {
66-
return authRole != null;
27+
public class MockAlwaysExpiredAuthenticationState extends MockMutableAuthenticationState {
28+
MockAlwaysExpiredAuthenticationState(AuthenticationProvider provider) {
29+
super(provider);
6730
}
6831

6932
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.auth;
20+
21+
import org.apache.pulsar.broker.authentication.AuthenticationState;
22+
import org.apache.pulsar.common.api.AuthData;
23+
import javax.net.ssl.SSLSession;
24+
import java.net.SocketAddress;
25+
26+
public class MockMutableAuthenticationProvider extends MockAuthenticationProvider {
27+
public AuthenticationState newAuthState(AuthData authData,
28+
SocketAddress remoteAddress,
29+
SSLSession sslSession) {
30+
return new MockMutableAuthenticationState(this);
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.auth;
20+
21+
import static java.nio.charset.StandardCharsets.UTF_8;
22+
import javax.naming.AuthenticationException;
23+
import java.util.concurrent.CompletableFuture;
24+
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
25+
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
26+
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
27+
import org.apache.pulsar.broker.authentication.AuthenticationState;
28+
import org.apache.pulsar.common.api.AuthData;
29+
30+
// MockMutableAuthenticationState always update the authentication data source and auth role.
31+
public class MockMutableAuthenticationState implements AuthenticationState {
32+
final AuthenticationProvider provider;
33+
AuthenticationDataSource authenticationDataSource;
34+
volatile String authRole;
35+
36+
MockMutableAuthenticationState(AuthenticationProvider provider) {
37+
this.provider = provider;
38+
}
39+
40+
@Override
41+
public String getAuthRole() throws AuthenticationException {
42+
if (authRole == null) {
43+
throw new AuthenticationException("Must authenticate first.");
44+
}
45+
return authRole;
46+
}
47+
48+
/**
49+
* This authentication is always single stage, so it returns immediately
50+
*/
51+
@Override
52+
public AuthData authenticate(AuthData authData) throws AuthenticationException {
53+
authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8));
54+
authRole = provider.authenticate(authenticationDataSource);
55+
return null;
56+
}
57+
58+
@Override
59+
public AuthenticationDataSource getAuthDataSource() {
60+
return authenticationDataSource;
61+
}
62+
63+
@Override
64+
public boolean isComplete() {
65+
return true;
66+
}
67+
68+
@Override
69+
public boolean isExpired() {
70+
return false;
71+
}
72+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java

+49
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.apache.pulsar.broker.PulsarService;
7777
import org.apache.pulsar.broker.ServiceConfiguration;
7878
import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
79+
import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider;
7980
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
8081
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
8182
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
@@ -1040,6 +1041,54 @@ public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exc
10401041
}));
10411042
}
10421043

1044+
@Test
1045+
public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() throws Exception {
1046+
AuthenticationService authenticationService = mock(AuthenticationService.class);
1047+
AuthenticationProvider authenticationProvider = new MockMutableAuthenticationProvider();
1048+
String authMethodName = authenticationProvider.getAuthMethodName();
1049+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
1050+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
1051+
svcConfig.setAuthenticationEnabled(true);
1052+
svcConfig.setAuthenticateOriginalAuthData(true);
1053+
svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
1054+
1055+
resetChannel();
1056+
assertTrue(channel.isActive());
1057+
assertEquals(serverCnx.getState(), State.Start);
1058+
1059+
String proxyRole = "pass.proxy";
1060+
String clientRole = "pass.client";
1061+
ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost",
1062+
clientRole, clientRole, authMethodName);
1063+
channel.writeInbound(connect);
1064+
Object connectResponse = getResponse();
1065+
assertTrue(connectResponse instanceof CommandConnected);
1066+
assertEquals(serverCnx.getOriginalAuthData().getCommandData(), clientRole);
1067+
assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), clientRole);
1068+
assertEquals(serverCnx.getOriginalPrincipal(), clientRole);
1069+
assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
1070+
assertEquals(serverCnx.getAuthRole(), proxyRole);
1071+
assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);
1072+
1073+
// Request refreshing the original auth.
1074+
// Expected:
1075+
// 1. Original role and original data equals to "pass.RefreshOriginAuthData".
1076+
// 2. The broker disconnects the client, because the new role doesn't equal the old role.
1077+
String newClientRole = "pass.RefreshOriginAuthData";
1078+
ByteBuf refreshAuth = Commands.newAuthResponse(authMethodName,
1079+
AuthData.of(newClientRole.getBytes(StandardCharsets.UTF_8)), 0, "test");
1080+
channel.writeInbound(refreshAuth);
1081+
1082+
assertEquals(serverCnx.getOriginalAuthData().getCommandData(), newClientRole);
1083+
assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), newClientRole);
1084+
assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
1085+
assertEquals(serverCnx.getAuthRole(), proxyRole);
1086+
assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);
1087+
1088+
assertFalse(channel.isOpen());
1089+
assertFalse(channel.isActive());
1090+
}
1091+
10431092
@Test(timeOut = 30000)
10441093
public void testProducerCommand() throws Exception {
10451094
resetChannel();

0 commit comments

Comments
 (0)