Skip to content

Commit

Permalink
[pulsar-admin] Add --all option to get all version schema of topic (a…
Browse files Browse the repository at this point in the history
…pache#12535)

### Motivation
Add `--all` option to get all version schema of topic when we use CLI `./bin/pulsar-admin schemas get tenant/ns/tp`
  • Loading branch information
yuruguo authored and eolivelli committed Nov 29, 2021
1 parent a373965 commit f2ccfbb
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.io.File;
import java.lang.reflect.Field;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
Expand All @@ -40,6 +45,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
import org.apache.pulsar.client.admin.Bookies;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.Brokers;
Expand All @@ -59,6 +65,7 @@
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
Expand Down Expand Up @@ -92,6 +99,7 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
Expand Down Expand Up @@ -1612,6 +1620,63 @@ void transactions() throws Exception {
verify(transactions).getPendingAckInternalStats("test", "test", false);
}

@Test
void schemas() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Schemas schemas = Mockito.mock(Schemas.class);
doReturn(schemas).when(admin).schemas();

CmdSchemas cmdSchemas = new CmdSchemas(() -> admin);
cmdSchemas.run(split("get -v 1 persistent://tn1/ns1/tp1"));
verify(schemas).getSchemaInfo("persistent://tn1/ns1/tp1", 1);

cmdSchemas = new CmdSchemas(() -> admin);
cmdSchemas.run(split("get -a persistent://tn1/ns1/tp1"));
verify(schemas).getAllSchemas("persistent://tn1/ns1/tp1");

cmdSchemas = new CmdSchemas(() -> admin);
cmdSchemas.run(split("get persistent://tn1/ns1/tp1"));
verify(schemas).getSchemaInfoWithVersion("persistent://tn1/ns1/tp1");

cmdSchemas = new CmdSchemas(() -> admin);
cmdSchemas.run(split("delete persistent://tn1/ns1/tp1"));
verify(schemas).deleteSchema("persistent://tn1/ns1/tp1");

cmdSchemas = new CmdSchemas(() -> admin);
String schemaFile = PulsarAdminToolTest.class.getClassLoader()
.getResource("test_schema_create.json").getFile();
cmdSchemas.run(split("upload -f " + schemaFile + " persistent://tn1/ns1/tp1"));
PostSchemaPayload input = new ObjectMapper().readValue(new File(schemaFile), PostSchemaPayload.class);
verify(schemas).createSchema("persistent://tn1/ns1/tp1", input);

cmdSchemas = new CmdSchemas(() -> admin);
String jarFile = PulsarAdminToolTest.class.getClassLoader()
.getResource("dummyexamples.jar").getFile();
String className = SchemaDemo.class.getName();
cmdSchemas.run(split("extract -j " + jarFile + " -c " + className + " -t json persistent://tn1/ns1/tp1"));
File file = new File(jarFile);
ClassLoader cl = new URLClassLoader(new URL[]{file.toURI().toURL()});
Class cls = cl.loadClass(className);
SchemaDefinition<Object> schemaDefinition =
SchemaDefinition.builder()
.withPojo(cls)
.withAlwaysAllowNull(true)
.build();
PostSchemaPayload postSchemaPayload = new PostSchemaPayload();
postSchemaPayload.setType("JSON");
postSchemaPayload.setSchema(SchemaExtractor.getJsonSchemaInfo(schemaDefinition));
postSchemaPayload.setProperties(schemaDefinition.getProperties());
verify(schemas).createSchema("persistent://tn1/ns1/tp1", postSchemaPayload);
}

public static class SchemaDemo {
public SchemaDemo() {
}

public static void main(String[] args) {
}
}

String[] split(String s) {
return s.split(" ");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type":"json",
"schema":""
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.admin.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
Expand Down Expand Up @@ -48,16 +49,27 @@ private class GetSchema extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--version" }, description = "version", required = false)
@Parameter(names = {"-v", "--version"}, description = "version", required = false)
private Long version;

@Parameter(names = {"-a", "--all-version"}, description = "all version", required = false)
private boolean all = false;

@Override
void run() throws Exception {
String topic = validateTopicName(params);
if (version == null) {
if (version != null && all) {
throw new ParameterException("Only one or neither of --version and --all-version can be specified.");
}
if (version == null && !all) {
System.out.println(getAdmin().schemas().getSchemaInfoWithVersion(topic));
} else {
} else if (!all) {
if (version < 0) {
throw new ParameterException("Option --version must be greater than 0, but found " + version);
}
System.out.println(getAdmin().schemas().getSchemaInfo(topic, version));
} else {
print(getAdmin().schemas().getAllSchemas(topic));
}
}
}
Expand Down

0 comments on commit f2ccfbb

Please sign in to comment.