Skip to content

Commit

Permalink
[improve][schema] Add admin cli for testCompatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
congbobo184 committed Mar 31, 2023
1 parent 68c10ee commit 00da970
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -449,4 +451,31 @@ public void testGetSchemaCompatibilityStrategyWhenSetBrokerLevelAndSchemaAutoUpd
admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
SchemaCompatibilityStrategy.UNDEFINED));
}

@Test
public void testCompatibility() throws Exception {
String topicName = schemaCompatibilityNamespace + "/testCompatibility";
try {
admin.schemas().getSchemaInfo(topicName);
fail();
} catch (PulsarAdminException.NotFoundException e) {
assertEquals(e.getMessage(), "Schema not found");
}
Map<String, String> properties = new HashMap<>();
PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
admin.schemas().createSchema(topicName, postSchemaPayload);
IsCompatibilityResponse isCompatibilityResponse =
admin.schemas().testCompatibility(topicName, postSchemaPayload);

assertTrue(isCompatibilityResponse.isCompatibility());
assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(), SchemaCompatibilityStrategy.FULL.name());
postSchemaPayload = new PostSchemaPayload("INT8", "", properties);
try {
admin.schemas().testCompatibility(topicName, postSchemaPayload);
fail();
} catch (Exception e) {
assertTrue(e instanceof PulsarAdminException.ServerSideErrorException);
assertTrue(e.getMessage().contains("Incompatible schema: exists schema type STRING, new schema type INT8"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,4 @@
public class IsCompatibilityResponse {
boolean isCompatibility;
String schemaCompatibilityStrategy;

}
Original file line number Diff line number Diff line change
Expand Up @@ -2355,6 +2355,11 @@ void schemas() throws Exception {
PostSchemaPayload input = new ObjectMapper().readValue(new File(schemaFile), PostSchemaPayload.class);
verify(schemas).createSchema("persistent://tn1/ns1/tp1", input);

cmdSchemas = new CmdSchemas(() -> admin);
cmdSchemas.run(split("test-compatibility -f " + schemaFile + " persistent://tn1/ns1/tp1"));
input = new ObjectMapper().readValue(new File(schemaFile), PostSchemaPayload.class);
verify(schemas).testCompatibility("persistent://tn1/ns1/tp1", input);

cmdSchemas = new CmdSchemas(() -> admin);
String jarFile = PulsarAdminToolTest.class.getClassLoader()
.getResource("dummyexamples.jar").getFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public CmdSchemas(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("delete", new DeleteSchema());
jcommander.addCommand("upload", new UploadSchema());
jcommander.addCommand("extract", new ExtractSchema());
jcommander.addCommand("test-compatibility", new TestCompatibility());
}

@Parameters(commandDescription = "Get the schema for a topic")
Expand Down Expand Up @@ -164,4 +165,20 @@ void run() throws Exception {
}
}

@Parameters(commandDescription = "Test schema compatibility")
private class TestCompatibility extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-f", "--filename" }, description = "filename", required = true)
private String schemaFileName;

@Override
void run() throws Exception {
String topic = validateTopicName(params);
PostSchemaPayload input = MAPPER.readValue(new File(schemaFileName), PostSchemaPayload.class);
getAdmin().schemas().testCompatibility(topic, input);
}
}

}

0 comments on commit 00da970

Please sign in to comment.