Skip to content

Commit

Permalink
Fixes function api can not use authdata to check superuser (#10364)
Browse files Browse the repository at this point in the history
---

Fixes #10332

*Motivation*

Sometimes, the superuser is not only needed the client
role but also needs some data in the authentication data
to check whether the role is the superuser.

The changed interface is introduced from #8560,
it's safe to change it since there is no release for that.
  • Loading branch information
zymap authored May 4, 2021
1 parent 06bb15b commit 76c93a3
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ public void startFunction(
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("path") String path) {
functions().uploadFunction(uploadedInputStream, path, clientAppId());
functions().uploadFunction(uploadedInputStream, path, clientAppId(), clientAuthData());
}

@GET
Expand Down Expand Up @@ -735,6 +735,6 @@ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenan
final @FormDataParam("delete") boolean delete) {

functions().updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
delete, uri.getRequestUri(), clientAppId());
delete, uri.getRequestUri(), clientAppId(), clientAuthData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,6 @@ public List<ConfigFieldDefinition> getSinkConfigDefinition(
})
@Path("/reloadBuiltInSinks")
public void reloadSinks() {
sinks().reloadConnectors(clientAppId());
sinks().reloadConnectors(clientAppId(), clientAuthData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,6 @@ public List<ConfigFieldDefinition> getSourceConfigDefinition(
})
@Path("/reloadBuiltInSources")
public void reloadSources() {
sources().reloadConnectors(clientAppId());
sources().reloadConnectors(clientAppId(), clientAuthData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -901,13 +901,13 @@ public List<ConnectorDefinition> getListOfConnectors() {
}

@Override
public void reloadConnectors(String clientRole) {
public void reloadConnectors(String clientRole, AuthenticationDataSource authenticationData) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
if (worker().getWorkerConfig().isAuthorizationEnabled()) {
// Only superuser has permission to do this operation.
if (!isSuperUser(clientRole)) {
if (!isSuperUser(clientRole, authenticationData)) {
throw new RestException(Status.UNAUTHORIZED, "This operation requires super-user access");
}
}
Expand Down Expand Up @@ -1205,13 +1205,14 @@ public void putFunctionState(final String tenant,
}

@Override
public void uploadFunction(final InputStream uploadedInputStream, final String path, String clientRole) {
public void uploadFunction(final InputStream uploadedInputStream, final String path, String clientRole,
AuthenticationDataSource authenticationData) {

if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}

if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) {
if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole, authenticationData)) {
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}

Expand Down Expand Up @@ -1309,7 +1310,7 @@ public StreamingOutput downloadFunction(final String path, String clientRole, Au
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
} else {
if (!isSuperUser(clientRole)) {
if (!isSuperUser(clientRole, clientAuthenticationDataHttps)) {
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
}
Expand Down Expand Up @@ -1441,7 +1442,7 @@ public boolean isAuthorizedRole(String tenant, String namespace, String clientRo
AuthenticationDataSource authenticationData) throws PulsarAdminException {
if (worker().getWorkerConfig().isAuthorizationEnabled()) {
// skip authorization if client role is super-user
if (isSuperUser(clientRole)) {
if (isSuperUser(clientRole, authenticationData)) {
return true;
}

Expand Down Expand Up @@ -1524,14 +1525,14 @@ protected void componentInstanceStatusRequestValidate (final String tenant,
}
}

public boolean isSuperUser(String clientRole) {
public boolean isSuperUser(String clientRole, AuthenticationDataSource authenticationData) {
if (clientRole != null) {
try {
if ((worker().getWorkerConfig().getSuperUserRoles() != null
&& worker().getWorkerConfig().getSuperUserRoles().contains(clientRole))) {
return true;
}
return worker().getAuthorizationService().isSuperUser(clientRole, null)
return worker().getAuthorizationService().isSuperUser(clientRole, authenticationData)
.get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
log.warn("Time-out {} sec while checking the role {} is a super user role ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,14 +670,15 @@ public void updateFunctionOnWorkerLeader(final String tenant,
final InputStream uploadedInputStream,
final boolean delete,
URI uri,
final String clientRole) {
final String clientRole,
AuthenticationDataSource authenticationData) {

if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}

if (worker().getWorkerConfig().isAuthorizationEnabled()) {
if (!isSuperUser(clientRole)) {
if (!isSuperUser(clientRole, authenticationData)) {
log.error("{}/{}/{} Client [{}] is not superuser to update on worker leader {}", tenant, namespace,
functionName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public Response stopFunctionInstances(String tenant, String namespace, String fu

@Override
public Response uploadFunction(InputStream uploadedInputStream, String path, String clientRole) {
delegate.uploadFunction(uploadedInputStream, path, clientRole);
delegate.uploadFunction(uploadedInputStream, path, clientRole, null);
return Response.ok().build();
}

Expand Down Expand Up @@ -248,4 +248,4 @@ private InstanceCommunication.FunctionStatus toProto(

return functionStatus;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public void startFunction(final @PathParam("tenant") String tenant,
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("path") String path) {
functions().uploadFunction(uploadedInputStream, path, clientAppId());
functions().uploadFunction(uploadedInputStream, path, clientAppId(), clientAuthData());
}

@GET
Expand Down Expand Up @@ -386,6 +386,6 @@ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenan
final @FormDataParam("delete") boolean delete) {

functions().updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
delete, uri.getRequestUri(), clientAppId());
delete, uri.getRequestUri(), clientAppId(), clientAuthData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,6 @@ public List<ConfigFieldDefinition> getSinkConfigDefinition(
})
@Path("/reloadBuiltInSinks")
public void reloadSinks() {
sinks().reloadConnectors(clientAppId());
sinks().reloadConnectors(clientAppId(), clientAuthData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,6 @@ public List<ConfigFieldDefinition> getSourceConfigDefinition(
})
@Path("/reloadBuiltInSources")
public void reloadSources() {
sources().reloadConnectors(clientAppId());
sources().reloadConnectors(clientAppId(), clientAuthData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ void putFunctionState(final String tenant,

void uploadFunction(final InputStream uploadedInputStream,
final String path,
String clientRole);
String clientRole,
final AuthenticationDataSource clientAuthenticationDataHttps);

StreamingOutput downloadFunction(String path,
String clientRole,
Expand All @@ -154,5 +155,5 @@ StreamingOutput downloadFunction(String tenant,
List<ConnectorDefinition> getListOfConnectors();


void reloadConnectors(String clientRole);
void reloadConnectors(String clientRole, AuthenticationDataSource clientAuthenticationDataHttps);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ void updateFunctionOnWorkerLeader(final String tenant,
final InputStream uploadedInputStream,
final boolean delete,
URI uri,
final String clientRole);
final String clientRole,
final AuthenticationDataSource clientAuthenticationDataHttps);

FunctionStatus getFunctionStatus(final String tenant,
final String namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker.rest.api;

import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttp;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.admin.Namespaces;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

import javax.servlet.http.HttpServletRequest;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -261,9 +263,10 @@ public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedExcep

// test pulsar super user
final String pulsarSuperUser = "pulsarSuperUser";
when(authorizationService.isSuperUser(pulsarSuperUser, null)).thenReturn(CompletableFuture.completedFuture(true));
when(authorizationService.isSuperUser(eq(pulsarSuperUser), any()))
.thenReturn(CompletableFuture.completedFuture(true));
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", pulsarSuperUser, authenticationDataSource));
assertTrue(functionImpl.isSuperUser(pulsarSuperUser));
assertTrue(functionImpl.isSuperUser(pulsarSuperUser, null));

// test normal user
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
Expand All @@ -274,7 +277,7 @@ public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedExcep
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
when(authorizationService.isSuperUser("test-user", null)).thenReturn(CompletableFuture.completedFuture(false));
when(authorizationService.isSuperUser(eq("test-user"), any())).thenReturn(CompletableFuture.completedFuture(false));
assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));

// if user is tenant admin
Expand Down Expand Up @@ -333,10 +336,10 @@ public void testIsSuperUser() throws PulsarAdminException {
});

AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class);
assertTrue(functionImpl.isSuperUser(superUser));
assertTrue(functionImpl.isSuperUser(superUser, null));

assertFalse(functionImpl.isSuperUser("normal-user"));
assertFalse(functionImpl.isSuperUser( null));
assertFalse(functionImpl.isSuperUser("normal-user", null));
assertFalse(functionImpl.isSuperUser( null, null));

// test super roles is null and it's not a pulsar super user
when(authorizationService.isSuperUser(superUser, null))
Expand All @@ -345,7 +348,22 @@ public void testIsSuperUser() throws PulsarAdminException {
workerConfig = new WorkerConfig();
workerConfig.setAuthorizationEnabled(true);
doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();
assertFalse(functionImpl.isSuperUser(superUser));
assertFalse(functionImpl.isSuperUser(superUser, null));

// test super role is null but the auth datasource contains superuser
when(authorizationService.isSuperUser(anyString(), any(AuthenticationDataSource.class)))
.thenAnswer((invocationOnMock -> {
AuthenticationDataSource authData = invocationOnMock.getArgument(1, AuthenticationDataSource.class);
String user = authData.getHttpHeader("mockedUser");
return CompletableFuture.completedFuture(superUser.equals(user));
}));
AuthenticationDataSource authData = mock(AuthenticationDataSource.class);
when(authData.getHttpHeader("mockedUser")).thenReturn(superUser);
assertTrue(functionImpl.isSuperUser("non-superuser", authData));

AuthenticationDataSource nonSuperuserAuthData = mock(AuthenticationDataSource.class);
when(nonSuperuserAuthData.getHttpHeader("mockedUser")).thenReturn("non-superuser");
assertFalse(functionImpl.isSuperUser("non-superuser", nonSuperuserAuthData));
}

public static FunctionConfig createDefaultFunctionConfig() {
Expand Down

0 comments on commit 76c93a3

Please sign in to comment.