Skip to content

Commit e829672

Browse files
authored
[improve][broker] Add parameter check for create/update cluster. (#19151)
1 parent 6335fa1 commit e829672

File tree

14 files changed

+313
-17
lines changed

14 files changed

+313
-17
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public void getCluster(@Suspended AsyncResponse asyncResponse,
133133
)
134134
@ApiResponses(value = {
135135
@ApiResponse(code = 204, message = "Cluster has been created."),
136+
@ApiResponse(code = 400, message = "Bad request parameter."),
136137
@ApiResponse(code = 403, message = "You don't have admin permission to create the cluster."),
137138
@ApiResponse(code = 409, message = "Cluster already exists."),
138139
@ApiResponse(code = 412, message = "Cluster name is not valid."),
@@ -161,6 +162,11 @@ public void createCluster(
161162
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
162163
.thenCompose(__ -> {
163164
NamedEntity.checkName(cluster);
165+
try {
166+
clusterData.checkPropertiesIfPresent();
167+
} catch (IllegalArgumentException ex) {
168+
throw new RestException(Status.BAD_REQUEST, ex.getMessage());
169+
}
164170
return clusterResources().getClusterAsync(cluster);
165171
}).thenCompose(clusterOpt -> {
166172
if (clusterOpt.isPresent()) {
@@ -190,6 +196,7 @@ public void createCluster(
190196
notes = "This operation requires Pulsar superuser privileges.")
191197
@ApiResponses(value = {
192198
@ApiResponse(code = 204, message = "Cluster has been updated."),
199+
@ApiResponse(code = 400, message = "Bad request parameter."),
193200
@ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."),
194201
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
195202
@ApiResponse(code = 500, message = "Internal server error.")
@@ -215,8 +222,14 @@ public void updateCluster(
215222
) ClusterDataImpl clusterData) {
216223
validateSuperUserAccessAsync()
217224
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
218-
.thenCompose(__ -> clusterResources().updateClusterAsync(cluster, old -> clusterData))
219-
.thenAccept(__ -> {
225+
.thenCompose(__ -> {
226+
try {
227+
clusterData.checkPropertiesIfPresent();
228+
} catch (IllegalArgumentException ex) {
229+
throw new RestException(Status.BAD_REQUEST, ex.getMessage());
230+
}
231+
return clusterResources().updateClusterAsync(cluster, old -> clusterData);
232+
}).thenAccept(__ -> {
220233
log.info("[{}] Updated cluster {}", clientAppId(), cluster);
221234
asyncResponse.resume(Response.ok().build());
222235
}).exceptionally(ex -> {

pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private EmbeddedPulsarCluster(int numBrokers, int numBookies, String metadataSto
7878
.serviceHttpUrl(adminUrl)
7979
.build();
8080

81-
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl(serviceUrl).build());
81+
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().brokerServiceUrl(serviceUrl).build());
8282
admin.tenants().createTenant("public",
8383
TenantInfo.builder().allowedClusters(Collections.singleton(CLUSTER_NAME)).build());
8484
admin.namespaces().createNamespace("public/default", Collections.singleton(CLUSTER_NAME));

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1941,7 +1941,7 @@ public void testUpdateClusterWithProxyUrl() throws Exception {
19411941
// update
19421942
cluster = ClusterData.builder()
19431943
.serviceUrl(pulsar.getWebServiceAddress())
1944-
.proxyServiceUrl("proxy")
1944+
.proxyServiceUrl("pulsar://example.com")
19451945
.proxyProtocol(ProxyProtocol.SNI)
19461946
.build();
19471947
admin.clusters().updateCluster(clusterName, cluster);

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiClusterTest.java

+14
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
*/
1919
package org.apache.pulsar.broker.admin;
2020

21+
import static org.testng.Assert.assertEquals;
2122
import static org.testng.Assert.assertNotNull;
2223
import static org.testng.Assert.assertThrows;
24+
import static org.testng.Assert.fail;
2325
import java.util.Set;
2426
import java.util.UUID;
2527
import lombok.extern.slf4j.Slf4j;
@@ -51,6 +53,18 @@ public void cleanup() throws Exception {
5153
super.internalCleanup();
5254
}
5355

56+
@Test
57+
public void testCreateClusterBadRequest() {
58+
try {
59+
admin.clusters()
60+
.createCluster("bad_request", ClusterData.builder()
61+
.serviceUrl("pulsar://example.com").build());
62+
fail("Unexpected behaviour");
63+
} catch (PulsarAdminException ex) {
64+
assertEquals(ex.getStatusCode(), 400);
65+
}
66+
}
67+
5468
@Test
5569
public void testDeleteNonExistCluster() {
5670
String cluster = "test-non-exist-cluster-" + UUID.randomUUID();

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -444,9 +444,7 @@ public void clusters() throws Exception {
444444
try {
445445
asyncRequests(ctx -> clusters.createCluster(ctx, "auth", ClusterDataImpl.builder()
446446
.serviceUrl("http://dummy.web.example.com")
447-
.serviceUrlTls("")
448-
.brokerServiceUrl("http://dummy.messaging.example.com")
449-
.brokerServiceUrlTls("")
447+
.brokerServiceUrl("pulsar://dummy.messaging.example.com")
450448
.authenticationPlugin("authenticationPlugin")
451449
.authenticationParameters("authenticationParameters")
452450
.listenerName("listenerName")
@@ -775,7 +773,7 @@ public void resourceQuotas() throws Exception {
775773
TenantInfoImpl admin = TenantInfoImpl.builder()
776774
.allowedClusters(Collections.singleton(cluster))
777775
.build();
778-
ClusterDataImpl clusterData = ClusterDataImpl.builder().serviceUrl(cluster).build();
776+
ClusterDataImpl clusterData = ClusterDataImpl.builder().serviceUrl("http://example.pulsar").build();
779777
asyncRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData ));
780778
asyncRequests(ctx -> properties.createTenant(ctx, property, admin));
781779

pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ protected void cleanup() throws Exception {
5252
*/
5353
@Test
5454
public void testSessionExpired() throws Exception {
55-
admin.clusters().createCluster("my-cluster", ClusterData.builder().serviceUrl("test-url").build());
55+
admin.clusters().createCluster("my-cluster", ClusterData.builder()
56+
.serviceUrl("http://test-url").build());
5657

5758
assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));
5859

@@ -64,12 +65,14 @@ public void testSessionExpired() throws Exception {
6465
assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));
6566

6667
try {
67-
admin.clusters().createCluster("my-cluster-2", ClusterData.builder().serviceUrl("test-url").build());
68+
admin.clusters().createCluster("my-cluster-2", ClusterData.builder()
69+
.serviceUrl("http://test-url").build());
6870
fail("Should have failed, because global zk is down");
6971
} catch (PulsarAdminException e) {
7072
// Ok
7173
}
7274

73-
admin.clusters().createCluster("cluster-2", ClusterData.builder().serviceUrl("test-url").build());
75+
admin.clusters().createCluster("cluster-2", ClusterData.builder()
76+
.serviceUrl("http://test-url").build());
7477
}
7578
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,7 @@ protected void setup() throws Exception {
111111

112112
admin.clusters().createCluster(configClusterName,
113113
ClusterData.builder()
114-
.brokerServiceUrl(brokerUrl.toString())
115-
.serviceUrl(getPulsar().getWebServiceAddress())
114+
.serviceUrl(brokerUrl.toString())
116115
.build()
117116
);
118117
}

pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ void setup(Method method) throws Exception {
179179
primaryHost = pulsar.getWebServiceAddress();
180180

181181
// update cluster metadata
182-
ClusterData clusterData = ClusterData.builder().serviceUrl(urlTls.toString()).build();
182+
ClusterData clusterData = ClusterData.builder().serviceUrlTls(urlTls.toString()).build();
183183
admin.clusters().updateCluster(config.getClusterName(), clusterData);
184184

185185
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());

pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,12 @@ public void setup(Method method) throws Exception {
186186
primaryHost = String.format("http://%s:%d", "localhost", pulsar.getListenPortHTTP().get());
187187

188188
// update cluster metadata
189-
ClusterData clusterData = ClusterData.builder().serviceUrl(pulsar.getBrokerServiceUrlTls()).build();
189+
ClusterData clusterData = ClusterData.builder()
190+
.serviceUrl(pulsar.getWebServiceAddress())
191+
.serviceUrlTls(pulsar.getWebServiceAddressTls())
192+
.brokerServiceUrl(pulsar.getBrokerServiceUrl())
193+
.brokerServiceUrlTls(pulsar.getBrokerServiceUrlTls())
194+
.build();
190195
admin.clusters().updateCluster(config.getClusterName(), clusterData);
191196

192197
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());

pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java

+28
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import io.swagger.annotations.ApiModel;
2222
import io.swagger.annotations.ApiModelProperty;
2323
import java.util.LinkedHashSet;
24+
import java.util.Objects;
2425
import lombok.AllArgsConstructor;
2526
import lombok.Data;
2627
import lombok.NoArgsConstructor;
2728
import org.apache.pulsar.client.api.ProxyProtocol;
29+
import org.apache.pulsar.common.util.URIPreconditions;
2830

2931
/**
3032
* The configuration data for a cluster.
@@ -400,4 +402,30 @@ public ClusterDataImpl build() {
400402
migratedClusterUrl);
401403
}
402404
}
405+
406+
/**
407+
* Check cluster data properties by rule, if some property is illegal, it will throw
408+
* {@link IllegalArgumentException}.
409+
*
410+
* @throws IllegalArgumentException exist illegal property.
411+
*/
412+
public void checkPropertiesIfPresent() throws IllegalArgumentException {
413+
URIPreconditions.checkURIIfPresent(getServiceUrl(),
414+
uri -> Objects.equals(uri.getScheme(), "http"),
415+
"Illegal service url, example: http://pulsar.example.com:8080");
416+
URIPreconditions.checkURIIfPresent(getServiceUrlTls(),
417+
uri -> Objects.equals(uri.getScheme(), "https"),
418+
"Illegal service tls url, example: https://pulsar.example.com:8443");
419+
URIPreconditions.checkURIIfPresent(getBrokerServiceUrl(),
420+
uri -> Objects.equals(uri.getScheme(), "pulsar"),
421+
"Illegal broker service url, example: pulsar://pulsar.example.com:6650");
422+
URIPreconditions.checkURIIfPresent(getBrokerServiceUrlTls(),
423+
uri -> Objects.equals(uri.getScheme(), "pulsar+ssl"),
424+
"Illegal broker service tls url, example: pulsar+ssl://pulsar.example.com:6651");
425+
URIPreconditions.checkURIIfPresent(getProxyServiceUrl(),
426+
uri -> Objects.equals(uri.getScheme(), "pulsar")
427+
|| Objects.equals(uri.getScheme(), "pulsar+ssl"),
428+
"Illegal proxy service url, example: pulsar+ssl://ats-proxy.example.com:4443 "
429+
+ "or pulsar://ats-proxy.example.com:4080");
430+
}
403431
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.common.util;
20+
21+
import static java.util.Objects.requireNonNull;
22+
import java.net.URI;
23+
import java.net.URISyntaxException;
24+
import java.util.function.Predicate;
25+
import javax.annotation.Nonnull;
26+
import javax.annotation.Nullable;
27+
import javax.annotation.concurrent.ThreadSafe;
28+
29+
/**
30+
* Static convenience URI checker.
31+
*/
32+
@ThreadSafe
33+
public class URIPreconditions {
34+
35+
/**
36+
* Check whether the given string is a legal URI and passes the user's check.
37+
*
38+
* @param uri URI String
39+
* @param predicate User defined rule
40+
* @throws IllegalArgumentException Illegal URI or failed in the user's rules
41+
*/
42+
public static void checkURI(@Nonnull String uri,
43+
@Nonnull Predicate<URI> predicate) throws IllegalArgumentException {
44+
checkURI(uri, predicate, null);
45+
}
46+
47+
/**
48+
* Check whether the given string is a legal URI and passes the user's check.
49+
*
50+
* @param uri URI String
51+
* @param predicate User defined rule
52+
* @throws IllegalArgumentException Illegal URI or failed in the user's rules
53+
*/
54+
public static void checkURIIfPresent(@Nullable String uri,
55+
@Nonnull Predicate<URI> predicate) throws IllegalArgumentException {
56+
checkURIIfPresent(uri, predicate, null);
57+
}
58+
59+
/**
60+
* Check whether the given string is a legal URI and passes the user's check.
61+
*
62+
* @param uri URI String
63+
* @param predicate User defined rule
64+
* @param errorMessage Error message
65+
* @throws IllegalArgumentException Illegal URI or failed in the user's rules
66+
*/
67+
public static void checkURIIfPresent(@Nullable String uri,
68+
@Nonnull Predicate<URI> predicate,
69+
@Nullable String errorMessage) throws IllegalArgumentException {
70+
if (uri == null) {
71+
return;
72+
}
73+
checkURI(uri, predicate, errorMessage);
74+
}
75+
76+
/**
77+
* Check whether the given string is a legal URI and passes the user's check.
78+
*
79+
* @param uri URI String
80+
* @param predicate User defined rule
81+
* @param errorMessage Error message
82+
* @throws IllegalArgumentException Illegal URI or failed in the user's rules
83+
*/
84+
public static void checkURI(@Nonnull String uri,
85+
@Nonnull Predicate<URI> predicate,
86+
@Nullable String errorMessage) throws IllegalArgumentException {
87+
requireNonNull(uri, "uri");
88+
requireNonNull(predicate, "predicate");
89+
try {
90+
URI u = new URI(uri);
91+
if (!predicate.test(u)) {
92+
throw new IllegalArgumentException(errorMessage == null ? "Illegal syntax: " + uri : errorMessage);
93+
}
94+
} catch (URISyntaxException e) {
95+
throw new IllegalArgumentException(errorMessage == null ? "Illegal syntax: " + uri : errorMessage);
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)