Skip to content

Commit

Permalink
CRUD support for ResourceGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
rvaidyanathan-splunk committed Apr 13, 2021
1 parent dd19702 commit dcbb1aa
Show file tree
Hide file tree
Showing 21 changed files with 1,378 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class ConfigurationCacheService {
private final ZooKeeperCache cache;
private ZooKeeperDataCache<TenantInfo> propertiesCache;
private ZooKeeperDataCache<Policies> policiesCache;
private ZooKeeperDataCache<ResourceGroup> resourcegroupsCache;
private ZooKeeperDataCache<ClusterData> clustersCache;
private ZooKeeperChildrenCache clustersListCache;
private ZooKeeperChildrenCache failureDomainListCache;
Expand All @@ -65,6 +67,7 @@ public class ConfigurationCacheService {
private PulsarResources pulsarResources;

public static final String POLICIES = "policies";
public static final String RESOURCEGROUPS = "resourcegroups";
public static final String FAILURE_DOMAIN = "failureDomain";
public final String CLUSTER_FAILURE_DOMAIN_ROOT;
public static final String POLICIES_ROOT = "/admin/policies";
Expand Down Expand Up @@ -101,6 +104,13 @@ public Policies deserialize(String path, byte[] content) throws Exception {
}
};

this.resourcegroupsCache = new ZooKeeperDataCache<ResourceGroup>(cache) {
@Override
public ResourceGroup deserialize(String path, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, ResourceGroup.class);
}
};

this.clustersCache = new ZooKeeperDataCache<ClusterData>(cache) {
@Override
public ClusterData deserialize(String path, byte[] content) throws Exception {
Expand Down Expand Up @@ -189,6 +199,10 @@ public ZooKeeperDataCache<Policies> policiesCache() {
return this.policiesCache;
}

public ZooKeeperDataCache<ResourceGroup> resourcegroupsCache() {
return this.resourcegroupsCache;
}

public ZooKeeperDataCache<ClusterData> clustersCache() {
return this.clustersCache;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class PulsarResources {
public static final int DEFAULT_OPERATION_TIMEOUT_SEC = 30;
private TenantResources tenantResources;
private ClusterResources clusterResources;
private ResourceGroupResources resourcegroupResources;
private NamespaceResources namespaceResources;
private DynamicConfigurationResources dynamicConfigResources;
private LocalPoliciesResources localPolicies;
Expand All @@ -49,6 +50,7 @@ public PulsarResources(MetadataStoreExtended localMetadataStore, MetadataStoreEx
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
clusterResources = new ClusterResources(configurationMetadataStore, operationTimeoutSec);
namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
}
if (localMetadataStore != null) {
dynamicConfigResources = new DynamicConfigurationResources(localMetadataStore, operationTimeoutSec);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resources;

import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

public class ResourceGroupResources extends BaseResources<ResourceGroup> {
public ResourceGroupResources(MetadataStoreExtended store, int operationTimeoutSec) {
super(store, ResourceGroup.class, operationTimeoutSec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.common.policies.data.Policies.getBundles;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -2565,5 +2566,26 @@ private void updatePolicies(String path, Function<Policies, Policies> updateFunc
}
}

private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
protected void internalSetNamespaceResourceGroup(String rgName) {
validateNamespacePolicyOperation(namespaceName, PolicyName.RESOURCEGROUP, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();

if (rgName != null) {
final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
// check resourcegroup exists.
try {
if (!resourceGroupResources().exists(resourceGroupPath)) {
throw new RestException(Status.PRECONDITION_FAILED, "ResourceGroup does not exist");
}
} catch (Exception e) {
log.error("[{}] Invalid ResourceGroup {}: {}", clientAppId(), rgName, e);
throw new RestException(e);
}
}

internalSetPolicies("resource_group_name", rgName);
}


private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
import java.util.Iterator;
import java.util.List;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ResourceGroupsBase extends AdminResource {
protected List<String> internalGetResourceGroups() {
try {
validateSuperUserAccess();
return getListOfResourcegroups("abc");
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to get ResourceGroups list ", clientAppId());
throw new RestException(Response.Status.NOT_FOUND, "Property does not exist");
} catch (Exception e) {
log.error("[{}] Failed to get ResourceGroups list: {}", clientAppId(), e);
throw new RestException(e);
}
}

protected ResourceGroup internalGetResourceGroup(String rgName) {
try {
final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
ResourceGroup resourceGroup = resourceGroupResources().get(resourceGroupPath)
.orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "ResourceGroup does not exist"));
return resourceGroup;
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get ResourceGroup {}", clientAppId(), rgName, e);
throw new RestException(e);
}
}

protected void internalUpdateResourceGroup(String rgName, ResourceGroup rgConfig) {
final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);

try {
ResourceGroup resourceGroup = resourceGroupResources().get(resourceGroupPath).orElseThrow(() ->
new RestException(Response.Status.NOT_FOUND, "ResourceGroup does not exist"));

/*
* assuming read-modify-write
*/
resourceGroup.publishRateInBytes = rgConfig.publishRateInBytes;
resourceGroup.publishRateInMsgs = rgConfig.publishRateInMsgs;
resourceGroup.dispatchRateInBytes = rgConfig.dispatchRateInBytes;
resourceGroup.dispatchRateInMsgs = rgConfig.dispatchRateInMsgs;

// write back the new ResourceGroup config.
resourceGroupResources().set(resourceGroupPath, r -> resourceGroup);
log.info("[{}] Successfully updated the ResourceGroup {}", clientAppId(), rgName);
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update configuration for ResourceGroup {}", clientAppId(), rgName, e);
throw new RestException(e);
}
}

protected void internalCreateResourceGroup(String rgName, ResourceGroup rgConfig) {
final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
try {
resourceGroupResources().create(resourceGroupPath, rgConfig);
log.info("[{}] Created ResourceGroup {}", clientAppId(), rgName);
} catch (MetadataStoreException.AlreadyExistsException e) {
log.warn("[{}] Failed to create ResourceGroup {} - already exists", clientAppId(), rgName);
throw new RestException(Response.Status.CONFLICT, "ResourceGroup already exists");
} catch (Exception e) {
log.error("[{}] Failed to create ResourceGroup {}", clientAppId(), rgName, e);
throw new RestException(e);
}

}
protected void internalCreateOrUpdateResourceGroup(String rgName, ResourceGroup rgConfig) {
try {
validateSuperUserAccess();
checkNotNull(rgConfig);
/*
* see if ResourceGroup exists and treat the request as a update if it does.
*/
final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
boolean rgExists = false;
try {
rgExists = resourceGroupResources().exists(resourceGroupPath);
} catch (Exception e) {
log.error("[{}] Failed to create/update ResourceGroup {}: {}", clientAppId(), rgName, e);
}

try {
if (rgExists) {
internalUpdateResourceGroup(rgName, rgConfig);
} else {
internalCreateResourceGroup(rgName, rgConfig);
}
} catch (Exception e) {
log.error("[{}] Failed to create/update ResourceGroup {}: {}", clientAppId(), rgName, e);
throw new RestException(e);
}
} catch (Exception e) {
log.error("[{}] Failed to create/update ResourceGroup {}: {}", clientAppId(), rgName, e);
throw new RestException(e);
}
}

protected boolean internalCheckRgInUse(String rgName) {
List<String> tenants;
try {
tenants = tenantResources().getChildren(path(POLICIES));
Iterator<String> tenantsIterator = tenants.iterator();
while (tenantsIterator.hasNext()) {
String tenant = tenantsIterator.next();
List<String> namespaces = getListOfNamespaces(tenant);
Iterator<String> namespaceIterator = namespaces.iterator();
while (namespaceIterator.hasNext()) {
String namespace = namespaceIterator.next();
Policies policies = getNamespacePolicies(NamespaceName.get(namespace));
if (null != policies && rgName.equals(policies.resource_group_name)) {
return true;
}
}
}
} catch (Exception e) {
log.error("[{}] Failed to get tenant/namespace list {}: {}", clientAppId(), rgName, e);
throw new RestException(e);
}
return false;
}

protected void internalDeleteResourceGroup(String rgName) {
/*
* need to walk the namespaces and make sure it is not in use
*/
try {
/*
* walk the namespaces and make sure it is not in use.
*/
if (internalCheckRgInUse(rgName)) {
throw new RestException(Response.Status.PRECONDITION_FAILED, "ResourceGroup is in use");
}
final String globalZkResourceGroupPath = path(RESOURCEGROUPS, rgName);
resourceGroupResources().delete(globalZkResourceGroupPath);
log.info("[{}] Deleted ResourceGroup {}", clientAppId(), rgName);
} catch (Exception e) {
log.error("[{}] Failed to delete ResourceGroup {}.", clientAppId(), rgName, e);
throw new RestException(e);
}
}

private static final Logger log = LoggerFactory.getLogger(ResourceGroupsBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1718,5 +1718,46 @@ public void setInactiveTopicPolicies(@PathParam("tenant") String tenant,
validateNamespaceName(tenant, namespace);
internalRemoveMaxTopicsPerNamespace();
}

@GET
@Path("/{tenant}/{namespace}/resourcegroup")
@ApiOperation(value = "Get the resourcegroup attached to the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public String getNamespaceResourceGroup(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.RESOURCEGROUP,
PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.resource_group_name;
}

@POST
@Path("/{tenant}/{namespace}/resourcegroup")
@ApiOperation(value = "Set resourcegroup for a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid resourcegroup") })
public void setNamespaceResourceGroup(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@ApiParam(value = "Name of resourcegroup", required = true) String rgName) {
validateNamespaceName(tenant, namespace);
internalSetNamespaceResourceGroup(rgName);
}

@DELETE
@Path("/{tenant}/{namespace}/resourcegroup")
@ApiOperation(value = "Delete resourcegroup for a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid resourcegroup")})
public void removeNamespaceResourceGroup(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetNamespaceResourceGroup(null);
}


private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Loading

0 comments on commit dcbb1aa

Please sign in to comment.