Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Add parameter check for create/update cluster. #19151

Merged
merged 8 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void getCluster(@Suspended AsyncResponse asyncResponse,
)
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Cluster has been created."),
@ApiResponse(code = 400, message = "Bad request parameter."),
@ApiResponse(code = 403, message = "You don't have admin permission to create the cluster."),
@ApiResponse(code = 409, message = "Cluster already exists."),
@ApiResponse(code = 412, message = "Cluster name is not valid."),
Expand Down Expand Up @@ -161,6 +162,11 @@ public void createCluster(
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
NamedEntity.checkName(cluster);
try {
clusterData.checkPropertiesIfPresent();
} catch (IllegalArgumentException ex) {
throw new RestException(Status.BAD_REQUEST, ex.getMessage());
}
return clusterResources().getClusterAsync(cluster);
}).thenCompose(clusterOpt -> {
if (clusterOpt.isPresent()) {
Expand Down Expand Up @@ -190,6 +196,7 @@ public void createCluster(
notes = "This operation requires Pulsar superuser privileges.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Cluster has been updated."),
@ApiResponse(code = 400, message = "Bad request parameter."),
@ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."),
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
Expand All @@ -215,8 +222,14 @@ public void updateCluster(
) ClusterDataImpl clusterData) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> clusterResources().updateClusterAsync(cluster, old -> clusterData))
.thenAccept(__ -> {
.thenCompose(__ -> {
try {
clusterData.checkPropertiesIfPresent();
} catch (IllegalArgumentException ex) {
throw new RestException(Status.BAD_REQUEST, ex.getMessage());
}
return clusterResources().updateClusterAsync(cluster, old -> clusterData);
}).thenAccept(__ -> {
log.info("[{}] Updated cluster {}", clientAppId(), cluster);
asyncResponse.resume(Response.ok().build());
}).exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private EmbeddedPulsarCluster(int numBrokers, int numBookies, String metadataSto
.serviceHttpUrl(adminUrl)
.build();

admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl(serviceUrl).build());
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().brokerServiceUrl(serviceUrl).build());
admin.tenants().createTenant("public",
TenantInfo.builder().allowedClusters(Collections.singleton(CLUSTER_NAME)).build());
admin.namespaces().createNamespace("public/default", Collections.singleton(CLUSTER_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1941,7 +1941,7 @@ public void testUpdateClusterWithProxyUrl() throws Exception {
// update
cluster = ClusterData.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.proxyServiceUrl("proxy")
.proxyServiceUrl("pulsar://example.com")
.proxyProtocol(ProxyProtocol.SNI)
.build();
admin.clusters().updateCluster(clusterName, cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
import java.util.Set;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -51,6 +53,18 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testCreateClusterBadRequest() {
try {
admin.clusters()
.createCluster("bad_request", ClusterData.builder()
.serviceUrl("pulsar://example.com").build());
fail("Unexpected behaviour");
} catch (PulsarAdminException ex) {
assertEquals(ex.getStatusCode(), 400);
}
}

@Test
public void testDeleteNonExistCluster() {
String cluster = "test-non-exist-cluster-" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,7 @@ public void clusters() throws Exception {
try {
asyncRequests(ctx -> clusters.createCluster(ctx, "auth", ClusterDataImpl.builder()
.serviceUrl("http://dummy.web.example.com")
.serviceUrlTls("")
.brokerServiceUrl("http://dummy.messaging.example.com")
.brokerServiceUrlTls("")
.brokerServiceUrl("pulsar://dummy.messaging.example.com")
.authenticationPlugin("authenticationPlugin")
.authenticationParameters("authenticationParameters")
.listenerName("listenerName")
Expand Down Expand Up @@ -775,7 +773,7 @@ public void resourceQuotas() throws Exception {
TenantInfoImpl admin = TenantInfoImpl.builder()
.allowedClusters(Collections.singleton(cluster))
.build();
ClusterDataImpl clusterData = ClusterDataImpl.builder().serviceUrl(cluster).build();
ClusterDataImpl clusterData = ClusterDataImpl.builder().serviceUrl("http://example.pulsar").build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a breaking change? I haven't reviewed the details.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a breaking change; we just didn't have data validation.

asyncRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData ));
asyncRequests(ctx -> properties.createTenant(ctx, property, admin));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ protected void cleanup() throws Exception {
*/
@Test
public void testSessionExpired() throws Exception {
admin.clusters().createCluster("my-cluster", ClusterData.builder().serviceUrl("test-url").build());
admin.clusters().createCluster("my-cluster", ClusterData.builder()
.serviceUrl("http://test-url").build());

assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));

Expand All @@ -64,12 +65,14 @@ public void testSessionExpired() throws Exception {
assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));

try {
admin.clusters().createCluster("my-cluster-2", ClusterData.builder().serviceUrl("test-url").build());
admin.clusters().createCluster("my-cluster-2", ClusterData.builder()
.serviceUrl("http://test-url").build());
fail("Should have failed, because global zk is down");
} catch (PulsarAdminException e) {
// Ok
}

admin.clusters().createCluster("cluster-2", ClusterData.builder().serviceUrl("test-url").build());
admin.clusters().createCluster("cluster-2", ClusterData.builder()
.serviceUrl("http://test-url").build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ protected void setup() throws Exception {

admin.clusters().createCluster(configClusterName,
ClusterData.builder()
.brokerServiceUrl(brokerUrl.toString())
.serviceUrl(getPulsar().getWebServiceAddress())
.serviceUrl(brokerUrl.toString())
.build()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ void setup(Method method) throws Exception {
primaryHost = pulsar.getWebServiceAddress();

// update cluster metadata
ClusterData clusterData = ClusterData.builder().serviceUrl(urlTls.toString()).build();
ClusterData clusterData = ClusterData.builder().serviceUrlTls(urlTls.toString()).build();
admin.clusters().updateCluster(config.getClusterName(), clusterData);

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ public void setup(Method method) throws Exception {
primaryHost = String.format("http://%s:%d", "localhost", pulsar.getListenPortHTTP().get());

// update cluster metadata
ClusterData clusterData = ClusterData.builder().serviceUrl(pulsar.getBrokerServiceUrlTls()).build();
ClusterData clusterData = ClusterData.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.serviceUrlTls(pulsar.getWebServiceAddressTls())
.brokerServiceUrl(pulsar.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar.getBrokerServiceUrlTls())
.build();
admin.clusters().updateCluster(config.getClusterName(), clusterData);

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.LinkedHashSet;
import java.util.Objects;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.common.util.URIPreconditions;

/**
* The configuration data for a cluster.
Expand Down Expand Up @@ -398,4 +400,30 @@ public ClusterDataImpl build() {
migratedClusterUrl);
}
}

/**
* Check cluster data properties by rule, if some property is illegal, it will throw
* {@link IllegalArgumentException}.
*
* @throws IllegalArgumentException exist illegal property.
*/
public void checkPropertiesIfPresent() throws IllegalArgumentException {
URIPreconditions.checkURIIfPresent(getServiceUrl(),
uri -> Objects.equals(uri.getScheme(), "http"),
"Illegal service url, example: http://pulsar.example.com:8080");
URIPreconditions.checkURIIfPresent(getServiceUrlTls(),
uri -> Objects.equals(uri.getScheme(), "https"),
"Illegal service tls url, example: https://pulsar.example.com:8443");
URIPreconditions.checkURIIfPresent(getBrokerServiceUrl(),
uri -> Objects.equals(uri.getScheme(), "pulsar"),
"Illegal broker service url, example: pulsar://pulsar.example.com:6650");
URIPreconditions.checkURIIfPresent(getBrokerServiceUrlTls(),
uri -> Objects.equals(uri.getScheme(), "pulsar+ssl"),
"Illegal broker service tls url, example: pulsar+ssl://pulsar.example.com:6651");
URIPreconditions.checkURIIfPresent(getProxyServiceUrl(),
uri -> Objects.equals(uri.getScheme(), "pulsar")
|| Objects.equals(uri.getScheme(), "pulsar+ssl"),
"Illegal proxy service url, example: pulsar+ssl://ats-proxy.example.com:4443 "
+ "or pulsar://ats-proxy.example.com:4080");
}
Comment on lines +410 to +428
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we document these requirements? Also, what would happen if a user had already created a cluster in a bad state?

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.util;

import static java.util.Objects.requireNonNull;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

/**
* Static convenience URI checker.
*/
@ThreadSafe
public class URIPreconditions {

/**
* Check whether the given string is a legal URI and passes the user's check.
*
* @param uri URI String
* @param predicate User defined rule
* @throws IllegalArgumentException Illegal URI or failed in the user's rules
*/
public static void checkURI(@Nonnull String uri,
@Nonnull Predicate<URI> predicate) throws IllegalArgumentException {
checkURI(uri, predicate, null);
}

/**
* Check whether the given string is a legal URI and passes the user's check.
*
* @param uri URI String
* @param predicate User defined rule
* @throws IllegalArgumentException Illegal URI or failed in the user's rules
*/
public static void checkURIIfPresent(@Nullable String uri,
@Nonnull Predicate<URI> predicate) throws IllegalArgumentException {
checkURIIfPresent(uri, predicate, null);
}

/**
* Check whether the given string is a legal URI and passes the user's check.
*
* @param uri URI String
* @param predicate User defined rule
* @param errorMessage Error message
* @throws IllegalArgumentException Illegal URI or failed in the user's rules
*/
public static void checkURIIfPresent(@Nullable String uri,
@Nonnull Predicate<URI> predicate,
@Nullable String errorMessage) throws IllegalArgumentException {
if (uri == null) {
return;
}
checkURI(uri, predicate, errorMessage);
}

/**
* Check whether the given string is a legal URI and passes the user's check.
*
* @param uri URI String
* @param predicate User defined rule
* @param errorMessage Error message
* @throws IllegalArgumentException Illegal URI or failed in the user's rules
*/
public static void checkURI(@Nonnull String uri,
@Nonnull Predicate<URI> predicate,
@Nullable String errorMessage) throws IllegalArgumentException {
requireNonNull(uri, "uri");
requireNonNull(predicate, "predicate");
try {
URI u = new URI(uri);
if (!predicate.test(u)) {
throw new IllegalArgumentException(errorMessage == null ? "Illegal syntax: " + uri : errorMessage);
}
} catch (URISyntaxException e) {
throw new IllegalArgumentException(errorMessage == null ? "Illegal syntax: " + uri : errorMessage);
}
}
}
Loading