Skip to content

Commit

Permalink
[Broker] Fix read schema compatibility strategy priority
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Jan 26, 2022
1 parent 10036d5 commit 74c508f
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 81 deletions.
8 changes: 3 additions & 5 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1290,12 +1290,10 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
# if you enable this setting, it will cause non-java clients failed to produce.
isSchemaValidationEnforced=false

# The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`,
# broker will use it in broker level. If schemaCompatibilityStrategy is `UNDEFINED` will use `FULL`.
# SchemaCompatibilityStrategy : UNDEFINED, ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD,
# The schema compatibility strategy in broker level.
# SchemaCompatibilityStrategy : ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD,
# FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
# default : UNDEFINED
schemaCompatibilityStrategy=
schemaCompatibilityStrategy=FULL

### --- Ledger Offloading --- ###

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2195,10 +2195,9 @@ public class ServiceConfiguration implements PulsarConfiguration {

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`"
+ ", schema compatibility strategy check will use it in broker level."
doc = "The schema compatibility strategy in broker level"
)
private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.UNDEFINED;
private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL;

/**** --- WebSocket. --- ****/
@FieldContext(
Expand Down Expand Up @@ -2610,4 +2609,10 @@ public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() {
}
}

public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
return SchemaCompatibilityStrategy.FULL;
}
return schemaCompatibilityStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
Expand Down Expand Up @@ -764,6 +765,20 @@ protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Thr
}
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
}
}
return schemaCompatibilityStrategy;
});
}

@CanIgnoreReturnValue
public static <T> T checkNotNull(T reference) {
return com.google.common.base.Preconditions.checkNotNull(reference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2342,15 +2342,13 @@ protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);

SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
}
if (!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
return schemaCompatibilityStrategy;
}
return schemaCompatibilityStrategy;

return SchemaCompatibilityStrategy.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
Expand Down Expand Up @@ -136,16 +134,7 @@ public void deleteSchema(boolean authoritative, AsyncResponse response) {
public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy =
pulsar().getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
}
}
getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> {
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
try {
Expand Down Expand Up @@ -199,26 +188,17 @@ public void testCompatibility(PostSchemaPayload payload, boolean authoritative,
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();
Policies policies = getNamespacePolicies(namespaceName);

SchemaCompatibilityStrategy schemaCompatibilityStrategy;
if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
}

pulsar().getSchemaRegistryService()
.isCompatible(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(),
schemaCompatibilityStrategy)
.thenAccept(isCompatible -> response.resume(Response.accepted()
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
.build()))
getSchemaCompatibilityStrategyAsync().thenCompose(schemaCompatibilityStrategy -> pulsar()
.getSchemaRegistryService().isCompatible(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(),
schemaCompatibilityStrategy)
.thenAccept(isCompatible -> response.resume(Response.accepted()
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
.build())))
.exceptionally(error -> {
response.resume(new RestException(error));
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,17 +643,19 @@ protected void setSchemaCompatibilityStrategy(Policies policies) {
if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
schemaCompatibilityStrategy =
brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
} else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = brokerService.pulsar()
.getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
return;
}

schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = brokerService.pulsar().getConfig().getSchemaCompatibilityStrategy();
}
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
}
}

private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
.quantile(0.0)
.quantile(0.50)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -42,9 +40,6 @@
@Slf4j
@Test(groups = "broker-admin")
public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest {

private static final Logger LOG = LoggerFactory.getLogger(AdminApiSchemaAutoUpdateTest.class);

@BeforeMethod
@Override
public void setup() throws Exception {
Expand All @@ -67,8 +62,7 @@ public void cleanup() throws Exception {
}

private void testAutoUpdateBackward(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null);
admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
SchemaAutoUpdateCompatibilityStrategy.Backward);

Expand All @@ -91,8 +85,7 @@ private void testAutoUpdateBackward(String namespace, String topicName) throws E
}

private void testAutoUpdateForward(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null);
admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
SchemaAutoUpdateCompatibilityStrategy.Forward);

Expand All @@ -114,8 +107,7 @@ private void testAutoUpdateForward(String namespace, String topicName) throws Ex
}

private void testAutoUpdateFull(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null);

try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
p.send(new V1Data("test1", 1));
Expand All @@ -142,8 +134,7 @@ private void testAutoUpdateFull(String namespace, String topicName) throws Excep
}

private void testAutoUpdateDisabled(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null);
admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
Expand All @@ -44,9 +45,12 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand All @@ -60,6 +64,8 @@
public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {

final String cluster = "test";
private final String schemaCompatibilityNamespace = "schematest/test-schema-compatibility-ns";

@BeforeMethod
@Override
public void setup() throws Exception {
Expand All @@ -71,6 +77,7 @@ public void setup() throws Exception {
admin.tenants().createTenant("schematest", tenantInfo);
admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
admin.namespaces().createNamespace("schematest/"+cluster+"/test", Sets.newHashSet("test"));
admin.namespaces().createNamespace(schemaCompatibilityNamespace, Sets.newHashSet("test"));
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -348,4 +355,57 @@ public long getCToken() {
assertEquals(ledgerInfo.entries, entryId + 1);
assertEquals(ledgerInfo.size, length);
}

@Test
public void testGetSchemaCompatibilityStrategy() throws PulsarAdminException {
assertNull(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace));
}

@Test
public void testGetSchemaAutoUpdateCompatibilityStrategy() throws PulsarAdminException {
assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace));
}

@Test
public void testGetSchemaCompatibilityStrategyWhenSetSchemaAutoUpdateCompatibilityStrategy()
throws PulsarAdminException {
assertNull(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace));

admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace,
SchemaAutoUpdateCompatibilityStrategy.Forward);
Awaitility.await().untilAsserted(() -> assertEquals(SchemaAutoUpdateCompatibilityStrategy.Forward,
admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace)
));

admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace,
SchemaCompatibilityStrategy.BACKWARD);
Awaitility.await().untilAsserted(() -> assertEquals(SchemaCompatibilityStrategy.BACKWARD,
admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace)));
}

@Test
public void testGetSchemaCompatibilityStrategyWhenSetBrokerLevelAndSchemaAutoUpdateCompatibilityStrategy()
throws PulsarAdminException {
pulsar.getConfiguration().setSchemaCompatibilityStrategy(SchemaCompatibilityStrategy.FORWARD);

assertNull(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace));

admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace,
SchemaAutoUpdateCompatibilityStrategy.AlwaysCompatible);
Awaitility.await().untilAsserted(() -> assertEquals(
admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE));

admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace,
SchemaCompatibilityStrategy.BACKWARD);
Awaitility.await().untilAsserted(() -> assertEquals(
admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
SchemaCompatibilityStrategy.BACKWARD));

admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace,
SchemaCompatibilityStrategy.UNDEFINED);
Awaitility.await().untilAsserted(() -> assertEquals(
admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE));
}
}
Loading

0 comments on commit 74c508f

Please sign in to comment.