Skip to content

Commit

Permalink
[improve][pip] PIP-360 Add admin API to display Schema metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Jun 19, 2024
1 parent c2702e9 commit ba196e6
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
Expand Down Expand Up @@ -105,6 +108,13 @@ public CompletableFuture<List<SchemaAndMetadata>> getAllSchemasAsync(boolean aut
});
}

public CompletableFuture<SchemaMetadata> getSchemaMetadataAsync(boolean authoritative) {
String schemaId = getSchemaId();
BookkeeperSchemaStorage storage = (BookkeeperSchemaStorage) pulsar().getSchemaStorage();
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenCompose(__ -> storage.getSchemaMetadata(schemaId));
}

public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative, boolean force) {
return validateDestinationAndAdminOperationAsync(authoritative)
.thenCompose(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.broker.admin.impl.SchemasResourceBase;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
Expand Down Expand Up @@ -170,6 +171,39 @@ public void getAllSchemas(
});
}

@GET
@Path("/{tenant}/{cluster}/{namespace}/{topic}/metadata")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get the schema metadata of a topic", response = SchemaMetadata.class)
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"),
@ApiResponse(code = 403, message = "Client is not authenticated"),
@ApiResponse(code = 404,
message = "Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"),
@ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
@ApiResponse(code = 500, message = "Internal Server Error"),
})
public void getSchemaMetadata(
@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@Suspended final AsyncResponse response
) {
validateTopicName(tenant, cluster, namespace, topic);
getSchemaMetadataAsync(authoritative)
.thenAccept(response::resume)
.exceptionally(ex -> {
if (shouldPrintErrorLog(ex)) {
log.error("[{}] Failed to schema metadata for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(response, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{cluster}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,38 @@ public void getAllSchemas(
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/metadata")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get the schema metadata of a topic", response = GetAllVersionsSchemaResponse.class)
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"),
@ApiResponse(code = 403, message = "Client is not authenticated"),
@ApiResponse(code = 404,
message = "Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"),
@ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
@ApiResponse(code = 500, message = "Internal Server Error"),
})
public void getSchemaMetadata(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@Suspended final AsyncResponse response
) {
validateTopicName(tenant, namespace, topic);
getSchemaMetadataAsync(authoritative)
.thenAccept(response::resume)
.exceptionally(ex -> {
if (shouldPrintErrorLog(ex)) {
log.error("[{}] Failed to schema metadata for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(response, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry;
import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
Expand Down Expand Up @@ -554,6 +557,23 @@ private CompletableFuture<Optional<LocatorEntry>> getSchemaLocator(String schema
o.map(r -> new LocatorEntry(r.getValue(), r.getStat().getVersion())));
}

public CompletableFuture<SchemaMetadata> getSchemaMetadata(String schema) {
return getLocator(schema).thenApply(locator -> {
if (!locator.isPresent()) {
return null;
}
SchemaLocator sl = locator.get().locator;
SchemaMetadata metadata = new SchemaMetadata();
IndexEntry info = sl.getInfo();
metadata.info = new SchemaMetadata.Entry(info.getPosition().getLedgerId(), info.getPosition().getEntryId(),
info.getVersion());
metadata.index = sl.getIndexList() == null ? null
: sl.getIndexList().stream().map(i -> new SchemaMetadata.Entry(i.getPosition().getLedgerId(),
i.getPosition().getEntryId(), i.getVersion())).collect(Collectors.toList());
return metadata;
});
}

@NotNull
private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, SchemaStorageFormat.SchemaEntry entry) {
final CompletableFuture<Long> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -84,6 +85,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
Expand Down Expand Up @@ -1486,4 +1488,28 @@ public SchemaStorageFormat.SchemaLocator deserialize(String path, byte[] content
consumer.close();
producer.close();
}

@Test
public void testTopicSchemaMetadata() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicOne = "metadata-topic";
final String topicName = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicOne).toString();

admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet(CLUSTER_NAME));

@Cleanup
Producer<Schemas.PersonTwo> producer = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
.topic(topicName).create();

SchemaMetadata metadata = admin.schemas().getSchemaMetadata(topicName);

assertNotNull(metadata);
assertNotNull(metadata.info);
assertNotEquals(metadata.info.getLedgerId(), 0);
assertEquals(metadata.info.getEntryId(), 0);
assertEquals(metadata.index.size(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -233,4 +234,19 @@ IsCompatibilityResponse testCompatibility(String topic, PostSchemaPayload schema
* @param topic topic name, in fully qualified format
*/
CompletableFuture<List<SchemaInfo>> getAllSchemasAsync(String topic);

/**
* Get schema metadata of the <tt>topic</tt>.
*
* @param topic topic name, in fully qualified format
* @throws PulsarAdminException
*/
SchemaMetadata getSchemaMetadata(String topic) throws PulsarAdminException;

/**
* Get schema metadata of the <tt>topic</tt> asynchronously.
*
* @param topic topic name, in fully qualified format
*/
CompletableFuture<SchemaMetadata> getSchemaMetadataAsync(String topic);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data;

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Schema metadata info.
*/
@Data
public class SchemaMetadata {

public Entry info;
public List<Entry> index;

@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Entry {
private long ledgerId;
private long entryId;
private long version;

@Override
public String toString() {
return String.format("ledgerId=[%d], entryId=[%d], version=[%d]", ledgerId, entryId, version);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaMetadata;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
Expand Down Expand Up @@ -276,6 +277,19 @@ public CompletableFuture<List<SchemaInfo>> getAllSchemasAsync(String topic) {
.collect(Collectors.toList()));
}

@Override
public SchemaMetadata getSchemaMetadata(String topic) throws PulsarAdminException {
return sync(() -> getSchemaMetadataAsync(topic));
}

@Override
public CompletableFuture<SchemaMetadata> getSchemaMetadataAsync(String topic) {
TopicName tn = TopicName.get(topic);
WebTarget path = metadata(tn);
return asyncGetRequest(path, new FutureCallback<SchemaMetadata>(){});
}


private WebTarget schemaPath(TopicName topicName) {
return topicPath(topicName, "schema");
}
Expand All @@ -292,6 +306,10 @@ private WebTarget compatibilityPath(TopicName topicName) {
return topicPath(topicName, "compatibility");
}

private WebTarget metadata(TopicName topicName) {
return topicPath(topicName, "metadata");
}

private WebTarget topicPath(TopicName topic, String... parts) {
final WebTarget base = topic.isV2() ? adminV2 : adminV1;
WebTarget topicPath = base.path(topic.getRestPath(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public CmdSchemas(Supplier<PulsarAdmin> admin) {
addCommand("delete", new DeleteSchema());
addCommand("upload", new UploadSchema());
addCommand("extract", new ExtractSchema());
addCommand("metadata", new GetSchemaMetadata());
addCommand("compatibility", new TestCompatibility());
}

Expand Down Expand Up @@ -77,6 +78,18 @@ void run() throws Exception {
}
}

@Command(description = "Get the schema for a topic")
private class GetSchemaMetadata extends CliCommand {
@Parameters(description = "persistent://tenant/namespace/topic", arity = "1")
private String topicName;

@Override
void run() throws Exception {
String topic = validateTopicName(topicName);
print(getAdmin().schemas().getSchemaMetadata(topic));
}
}

@Command(description = "Delete all versions schema of a topic")
private class DeleteSchema extends CliCommand {
@Parameters(description = "persistent://tenant/namespace/topic", arity = "1")
Expand Down
Loading

0 comments on commit ba196e6

Please sign in to comment.