Skip to content

Commit f4c2d9d

Browse files
nodecemichaeljmarshall
authored andcommitted
[fix][broker] Fix token expiration (apache#16016)
### Motivation When token expiration, the broker requests the client to refresh the token, then the broker performs `org.apache.pulsar.broker.service.ServerCnx#doAuthentication` when the broker receives the auth response, which uses `org.apache.pulsar.broker.authentication.AuthenticationState#authenticate` to authentication, but the `org.apache.pulsar.broker.authentication.AuthenticationProviderToken.TokenAuthenticationState#authenticate` doesn't do anything, this cause a loop to refresh the token. Right now the token is only validated in the TokenAuthenticationState constructor, we need to add a check to the authenticate method. (cherry picked from commit 7576c93)
1 parent 7e87e9d commit f4c2d9d

File tree

3 files changed

+177
-1
lines changed

3 files changed

+177
-1
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,8 @@ public String getAuthRole() throws AuthenticationException {
356356
*/
357357
@Override
358358
public AuthData authenticate(AuthData authData) throws AuthenticationException {
359-
// There's no additional auth stage required
359+
String token = new String(authData.getBytes(), UTF_8);
360+
checkExpiration(token);
360361
return null;
361362
}
362363

pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java

+6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@
3131
import org.testng.annotations.BeforeMethod;
3232

3333
public abstract class ProducerConsumerBase extends MockedPulsarServiceBaseTest {
34+
protected final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
35+
protected final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
36+
protected final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
37+
protected final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
38+
protected final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
39+
3440
protected String methodName;
3541

3642
@BeforeMethod(alwaysRun = true)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.pulsar.client.api;
21+
22+
import static org.testng.Assert.assertThrows;
23+
import static org.testng.Assert.assertTrue;
24+
import com.google.common.collect.Sets;
25+
import io.jsonwebtoken.Jwts;
26+
import io.jsonwebtoken.SignatureAlgorithm;
27+
import lombok.Cleanup;
28+
import lombok.extern.slf4j.Slf4j;
29+
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
30+
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
31+
import org.apache.pulsar.client.admin.PulsarAdmin;
32+
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
33+
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
34+
import org.apache.pulsar.common.naming.NamespaceName;
35+
import org.apache.pulsar.common.policies.data.AuthAction;
36+
import org.apache.pulsar.common.policies.data.ClusterData;
37+
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
38+
import org.awaitility.Awaitility;
39+
import org.testng.annotations.AfterMethod;
40+
import org.testng.annotations.BeforeMethod;
41+
import org.testng.annotations.Test;
42+
import javax.crypto.SecretKey;
43+
import java.time.Duration;
44+
import java.util.Base64;
45+
import java.util.Calendar;
46+
import java.util.Date;
47+
import java.util.HashSet;
48+
import java.util.Optional;
49+
import java.util.Set;
50+
import java.util.concurrent.TimeUnit;
51+
52+
@Test(groups = "broker-api")
53+
@Slf4j
54+
public class TokenExpirationProduceConsumerTest extends TlsProducerConsumerBase {
55+
private final String tenant ="my-tenant";
56+
private final NamespaceName namespaceName = NamespaceName.get("my-tenant","my-ns");
57+
58+
@BeforeMethod
59+
@Override
60+
protected void setup() throws Exception {
61+
// TLS configuration for Broker
62+
internalSetUpForBroker();
63+
64+
// Start Broker
65+
super.init();
66+
67+
admin = getAdmin(ADMIN_TOKEN);
68+
admin.clusters().createCluster(configClusterName,
69+
ClusterData.builder()
70+
.serviceUrl(brokerUrl.toString())
71+
.serviceUrlTls(brokerUrlTls.toString())
72+
.brokerServiceUrl(pulsar.getBrokerServiceUrl())
73+
.brokerServiceUrlTls(pulsar.getBrokerServiceUrlTls())
74+
.build());
75+
admin.tenants().createTenant(tenant,
76+
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(configClusterName)));
77+
admin.namespaces().createNamespace(namespaceName.toString());
78+
}
79+
80+
@AfterMethod(alwaysRun = true)
81+
@Override
82+
protected void cleanup() throws Exception {
83+
super.internalCleanup();
84+
}
85+
86+
private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
87+
public static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
88+
89+
public String getExpireToken(String role, Date date) {
90+
return Jwts.builder().setSubject(role).signWith(SECRET_KEY)
91+
.setExpiration(date).compact();
92+
}
93+
94+
protected void internalSetUpForBroker() {
95+
conf.setBrokerServicePortTls(Optional.of(0));
96+
conf.setWebServicePortTls(Optional.of(0));
97+
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
98+
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
99+
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
100+
conf.setClusterName(configClusterName);
101+
conf.setAuthenticationRefreshCheckSeconds(1);
102+
conf.setTlsRequireTrustedClientCertOnConnect(false);
103+
conf.setTlsAllowInsecureConnection(false);
104+
conf.setAuthenticationEnabled(true);
105+
conf.setTransactionCoordinatorEnabled(true);
106+
conf.setSuperUserRoles(Sets.newHashSet("admin"));
107+
conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
108+
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
109+
conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
110+
conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
111+
+ Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
112+
}
113+
114+
private PulsarClient getClient(String token) throws Exception {
115+
ClientBuilder clientBuilder = PulsarClient.builder()
116+
.serviceUrl(pulsar.getBrokerServiceUrlTls())
117+
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
118+
.enableTls(true)
119+
.allowTlsInsecureConnection(false)
120+
.enableTlsHostnameVerification(true)
121+
.authentication(AuthenticationToken.class.getName(),"token:" +token)
122+
.operationTimeout(1000, TimeUnit.MILLISECONDS);
123+
return clientBuilder.build();
124+
}
125+
126+
private PulsarAdmin getAdmin(String token) throws Exception {
127+
PulsarAdminBuilder clientBuilder = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddressTls())
128+
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
129+
.allowTlsInsecureConnection(false)
130+
.authentication(AuthenticationToken.class.getName(),"token:" +token)
131+
.enableTlsHostnameVerification(true);
132+
return clientBuilder.build();
133+
}
134+
135+
@Test
136+
public void testTokenExpirationProduceConsumer() throws Exception {
137+
Calendar calendar = Calendar.getInstance();
138+
calendar.add(Calendar.SECOND, 20);
139+
String role = "test";
140+
String token = getExpireToken(role, calendar.getTime());
141+
Date expiredTime = calendar.getTime();
142+
143+
Set<AuthAction> permissions = new HashSet<>();
144+
permissions.add(AuthAction.consume);
145+
permissions.add(AuthAction.produce);
146+
admin.namespaces().grantPermissionOnNamespace(namespaceName.toString(), role, permissions);
147+
148+
@Cleanup
149+
PulsarClient pulsarClient = getClient(token);
150+
String topic = namespaceName + "/test-token";
151+
152+
@Cleanup final Consumer<byte[]> consumer = pulsarClient.newConsumer()
153+
.topic(topic)
154+
.subscriptionName("test-token")
155+
.subscribe();
156+
@Cleanup final Producer<byte[]> producer = pulsarClient.newProducer()
157+
.topic(topic)
158+
.create();
159+
160+
Awaitility.await().timeout(Duration.ofSeconds(60)).pollInterval(3, TimeUnit.SECONDS).untilAsserted(() -> {
161+
assertThrows(PulsarClientException.TimeoutException.class, () -> {
162+
producer.send("heart beat".getBytes());
163+
Message<byte[]> message = consumer.receive();
164+
consumer.acknowledge(message);
165+
});
166+
assertTrue(new Date().compareTo(expiredTime) > 0);
167+
});
168+
}
169+
}

0 commit comments

Comments
 (0)