Skip to content

Commit

Permalink
[improve][schema] Change update schema auth from tenant to produce (a…
Browse files Browse the repository at this point in the history
…pache#18074)

(cherry picked from commit 26b47ff)
  • Loading branch information
congbobo184 authored and liangyepianzhou committed Dec 13, 2022
1 parent 0cc5dce commit 3b9f854
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -748,40 +748,40 @@ protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityS
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
.thenCompose((__) -> {
CompletableFuture<SchemaCompatibilityStrategy> future;
if (config().isTopicLevelPoliciesEnabled()) {
future = getTopicPoliciesAsyncWithRetry(topicName)
.thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
} else {
future = CompletableFuture.completedFuture(null);
}

return future.thenCompose((topicSchemaCompatibilityStrategy) -> {
if (!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
return CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
}
return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy =
policies.schema_compatibility_strategy;
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
}
}
return schemaCompatibilityStrategy;
});
});
}).whenComplete((__, ex) -> {
.thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get schema compatibility strategy of topic {} {}",
clientAppId(), topicName, ex);
}
});
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsyncWithoutAuth() {
CompletableFuture<SchemaCompatibilityStrategy> future = CompletableFuture.completedFuture(null);
if (config().isTopicLevelPoliciesEnabled()) {
future = getTopicPoliciesAsyncWithRetry(topicName)
.thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
}

return future.thenCompose((topicSchemaCompatibilityStrategy) -> {
if (!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
return CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
}
return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy =
policies.schema_compatibility_strategy;
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
}
}
return schemaCompatibilityStrategy;
});
});
}

@CanIgnoreReturnValue
public static <T> T checkNotNull(T reference) {
return com.google.common.base.Preconditions.checkNotNull(reference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -133,9 +134,8 @@ public void deleteSchema(boolean authoritative, AsyncResponse response) {
}

public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> {
validateOwnershipAndOperation(authoritative, TopicOperation.PRODUCE);
getSchemaCompatibilityStrategyAsyncWithoutAuth().thenAccept(schemaCompatibilityStrategy -> {
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
private static final String CONSUME_TOKEN = Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact();

private static final String PRODUCE_TOKEN = Jwts.builder().setSubject("producer").signWith(SECRET_KEY).compact();

@BeforeMethod
@Override
public void setup() throws Exception {
Expand Down Expand Up @@ -109,11 +111,18 @@ public void testGetCreateDeleteSchema() throws Exception {
.serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
.authentication(AuthenticationToken.class.getName(), CONSUME_TOKEN)
.build();

PulsarAdmin adminWithProducePermission = PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
.authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN)
.build();
admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce));

SchemaInfo si = Schema.BOOL.getSchemaInfo();
assertThrows(PulsarAdminException.class, () -> adminWithConsumePermission.schemas().getSchemaInfo(topicName));
assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().createSchema(topicName, si));
adminWithProducePermission.schemas().createSchema(topicName, si);
adminWithAdminPermission.schemas().createSchema(topicName, si);

assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName));
Expand Down

0 comments on commit 3b9f854

Please sign in to comment.