diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index f90a2846418572..529c1ea19a4b6b 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -28,10 +28,15 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; + +import java.io.File; import java.lang.reflect.Field; +import java.net.URL; +import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -40,6 +45,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.admin.cli.utils.SchemaExtractor; import org.apache.pulsar.client.admin.Bookies; import org.apache.pulsar.client.admin.BrokerStats; import org.apache.pulsar.client.admin.Brokers; @@ -59,6 +65,7 @@ import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; @@ -92,6 +99,7 @@ import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.protocol.schema.PostSchemaPayload; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; @@ -1612,6 +1620,63 @@ void transactions() throws Exception { verify(transactions).getPendingAckInternalStats("test", "test", false); } + @Test + void schemas() throws Exception { + PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); + Schemas schemas = Mockito.mock(Schemas.class); + doReturn(schemas).when(admin).schemas(); + + CmdSchemas cmdSchemas = new CmdSchemas(() -> admin); + cmdSchemas.run(split("get -v 1 persistent://tn1/ns1/tp1")); + verify(schemas).getSchemaInfo("persistent://tn1/ns1/tp1", 1); + + cmdSchemas = new CmdSchemas(() -> admin); + cmdSchemas.run(split("get -a persistent://tn1/ns1/tp1")); + verify(schemas).getAllSchemas("persistent://tn1/ns1/tp1"); + + cmdSchemas = new CmdSchemas(() -> admin); + cmdSchemas.run(split("get persistent://tn1/ns1/tp1")); + verify(schemas).getSchemaInfoWithVersion("persistent://tn1/ns1/tp1"); + + cmdSchemas = new CmdSchemas(() -> admin); + cmdSchemas.run(split("delete persistent://tn1/ns1/tp1")); + verify(schemas).deleteSchema("persistent://tn1/ns1/tp1"); + + cmdSchemas = new CmdSchemas(() -> admin); + String schemaFile = PulsarAdminToolTest.class.getClassLoader() + .getResource("test_schema_create.json").getFile(); + cmdSchemas.run(split("upload -f " + schemaFile + " persistent://tn1/ns1/tp1")); + PostSchemaPayload input = new ObjectMapper().readValue(new File(schemaFile), PostSchemaPayload.class); + verify(schemas).createSchema("persistent://tn1/ns1/tp1", input); + + cmdSchemas = new CmdSchemas(() -> admin); + String jarFile = PulsarAdminToolTest.class.getClassLoader() + .getResource("dummyexamples.jar").getFile(); + String className = SchemaDemo.class.getName(); + cmdSchemas.run(split("extract -j " + jarFile + " -c " + className + " -t json persistent://tn1/ns1/tp1")); + File file = new File(jarFile); + ClassLoader cl = new URLClassLoader(new URL[]{file.toURI().toURL()}); + Class cls = cl.loadClass(className); + SchemaDefinition schemaDefinition = + SchemaDefinition.builder() + .withPojo(cls) + .withAlwaysAllowNull(true) + .build(); + PostSchemaPayload postSchemaPayload = new PostSchemaPayload(); + postSchemaPayload.setType("JSON"); + postSchemaPayload.setSchema(SchemaExtractor.getJsonSchemaInfo(schemaDefinition)); + postSchemaPayload.setProperties(schemaDefinition.getProperties()); + verify(schemas).createSchema("persistent://tn1/ns1/tp1", postSchemaPayload); + } + + public static class SchemaDemo { + public SchemaDemo() { + } + + public static void main(String[] args) { + } + } + String[] split(String s) { return s.split(" "); } diff --git a/pulsar-client-tools-test/src/test/resources/test_schema_create.json b/pulsar-client-tools-test/src/test/resources/test_schema_create.json new file mode 100644 index 00000000000000..241a985c073f71 --- /dev/null +++ b/pulsar-client-tools-test/src/test/resources/test_schema_create.json @@ -0,0 +1,4 @@ +{ + "type":"json", + "schema":"" +} \ No newline at end of file diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java index 045f64e0f12f5b..302f6e2f5c8bc3 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java @@ -19,6 +19,7 @@ package org.apache.pulsar.admin.cli; import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; import com.beust.jcommander.Parameters; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; @@ -48,16 +49,27 @@ private class GetSchema extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List params; - @Parameter(names = { "--version" }, description = "version", required = false) + @Parameter(names = {"-v", "--version"}, description = "version", required = false) private Long version; + @Parameter(names = {"-a", "--all-version"}, description = "all version", required = false) + private boolean all = false; + @Override void run() throws Exception { String topic = validateTopicName(params); - if (version == null) { + if (version != null && all) { + throw new ParameterException("Only one or neither of --version and --all-version can be specified."); + } + if (version == null && !all) { System.out.println(getAdmin().schemas().getSchemaInfoWithVersion(topic)); - } else { + } else if (!all) { + if (version < 0) { + throw new ParameterException("Option --version must be greater than 0, but found " + version); + } System.out.println(getAdmin().schemas().getSchemaInfo(topic, version)); + } else { + print(getAdmin().schemas().getAllSchemas(topic)); } } }