diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 2201ee3031c55..8aa678e9e9f0a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -630,6 +630,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
                             case COMPACT:
                             case OFFLOAD:
                             case UNLOAD:
+                            case DELETE_METADATA:
                             case ADD_BUNDLE_RANGE:
                             case GET_BUNDLE_RANGE:
                             case DELETE_BUNDLE_RANGE:
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 82711096701f2..289b25df5ebd5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -64,6 +64,7 @@
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -654,6 +655,117 @@ private CompletableFuture<Map<String, String>> getPropertiesAsync() {
         });
     }
 
+    protected CompletableFuture<Void> internalUpdatePropertiesAsync(boolean authoritative,
+                                                                    Map<String, String> properties) {
+        if (properties == null || properties.isEmpty()) {
+            log.warn("[{}] [{}] properties is empty, ignore update", clientAppId(), topicName);
+            return CompletableFuture.completedFuture(null);
+        }
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PRODUCE))
+            .thenCompose(__ -> {
+                if (topicName.isPartitioned()) {
+                    return internalUpdateNonPartitionedTopicProperties(properties);
+                } else {
+                    return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
+                        .thenCompose(metadata -> {
+                            if (metadata.partitions == 0) {
+                                return internalUpdateNonPartitionedTopicProperties(properties);
+                            }
+                            return namespaceResources()
+                                .getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
+                                    p -> new PartitionedTopicMetadata(p.partitions,
+                                            p.properties == null ? properties
+                                                    : MapUtils.putAll(p.properties, properties.entrySet().toArray())));
+                        });
+                }
+            }).thenAccept(__ ->
+                log.info("[{}] [{}] update properties success with properties {}",
+                    clientAppId(), topicName, properties));
+    }
+
+    private CompletableFuture<Void> internalUpdateNonPartitionedTopicProperties(Map<String, String> properties) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar().getBrokerService().getTopicIfExists(topicName.toString())
+            .thenAccept(opt -> {
+                if (!opt.isPresent()) {
+                    throw new RestException(Status.NOT_FOUND,
+                        getTopicNotFoundErrorMessage(topicName.toString()));
+                }
+                ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
+                managedLedger.asyncSetProperties(properties, new AsyncCallbacks.UpdatePropertiesCallback() {
+
+                    @Override
+                    public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+                        if (managedLedger.getConfig().getProperties() == null) {
+                            managedLedger.getConfig().setProperties(new HashMap<>());
+                        }
+                        managedLedger.getConfig().getProperties().putAll(properties);
+
+                        future.complete(null);
+                    }
+
+                    @Override
+                    public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+                        future.completeExceptionally(exception);
+                    }
+                }, null);
+            });
+        return future;
+    }
+
+    protected CompletableFuture<Void> internalRemovePropertiesAsync(boolean authoritative, String key) {
+        return validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.DELETE_METADATA))
+                .thenCompose(__ -> {
+                    if (topicName.isPartitioned()) {
+                        return internalRemoveNonPartitionedTopicProperties(key);
+                    } else {
+                        return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
+                                .thenCompose(metadata -> {
+                                    if (metadata.partitions == 0) {
+                                        return internalRemoveNonPartitionedTopicProperties(key);
+                                    }
+                                    return namespaceResources()
+                                            .getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
+                                                    p -> {
+                                                        if (p.properties != null) {
+                                                            p.properties.remove(key);
+                                                        }
+                                                        return new PartitionedTopicMetadata(p.partitions, p.properties);
+                                                    });
+                                });
+                    }
+                }).thenAccept(__ ->
+                        log.info("[{}] remove [{}] properties success with key {}",
+                                clientAppId(), topicName, key));
+    }
+
+    private CompletableFuture<Void> internalRemoveNonPartitionedTopicProperties(String key) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar().getBrokerService().getTopicIfExists(topicName.toString())
+                .thenAccept(opt -> {
+                    if (!opt.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND,
+                                getTopicNotFoundErrorMessage(topicName.toString()));
+                    }
+                    ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
+                    managedLedger.asyncDeleteProperty(key, new AsyncCallbacks.UpdatePropertiesCallback() {
+
+                        @Override
+                        public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+                            future.complete(null);
+                        }
+
+                        @Override
+                        public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+                            future.completeExceptionally(exception);
+                        }
+                    }, null);
+                });
+        return future;
+    }
+
     protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,
                                                   boolean force, boolean deleteSchema) {
         validateTopicOwnershipAsync(topicName, authoritative)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 4dd4c1310cc62..278d9e29d8b51 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -931,6 +931,77 @@ public void getProperties(
                 });
     }
 
+    @PUT
+    @Path("/{tenant}/{namespace}/{topic}/properties")
+    @ApiOperation(value = "Update the properties on the given topic.")
+    @ApiResponses(value = {
+        @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+        @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+            + "subscriber is not authorized to access this operation"),
+        @ApiResponse(code = 403, message = "Don't have admin permission"),
+        @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+        @ApiResponse(code = 405, message = "Method Not Allowed"),
+        @ApiResponse(code = 500, message = "Internal server error"),
+        @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+    })
+    public void updateProperties(
+        @Suspended final AsyncResponse asyncResponse,
+        @ApiParam(value = "Specify the tenant", required = true)
+        @PathParam("tenant") String tenant,
+        @ApiParam(value = "Specify the namespace", required = true)
+        @PathParam("namespace") String namespace,
+        @ApiParam(value = "Specify topic name", required = true)
+        @PathParam("topic") @Encoded String encodedTopic,
+        @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
+        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+        @ApiParam(value = "Key value pair properties for the topic metadata") Map<String, String> properties){
+        validatePersistentTopicName(tenant, namespace, encodedTopic);
+        internalUpdatePropertiesAsync(authoritative, properties)
+            .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                if (!isRedirectException(ex)) {
+                    log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/properties")
+    @ApiOperation(value = "Remove the key in properties on the given topic.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Partitioned topic does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    public void removeProperties(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("key") String key,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        validatePersistentTopicName(tenant, namespace, encodedTopic);
+        internalRemovePropertiesAsync(authoritative, key)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to remove key {} in properties on topic {}",
+                                clientAppId(), key, topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/partitions")
     @ApiOperation(value = "Delete a partitioned topic.",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 48b859bd11b87..b6077fda9de01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1469,7 +1469,8 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
         pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
                 .thenAccept(isActive -> {
                     if (isActive) {
-                        createPersistentTopic(topic, createIfMissing, topicFuture, properties, topicPolicies);
+                        createPersistentTopic(topic, createIfMissing, topicFuture,
+                                properties == null ? new HashMap<>() : properties, topicPolicies);
                     } else {
                         // namespace is being unloaded
                         String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 5b43aa7145e81..d84fab7137f86 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -902,6 +902,99 @@ public void testCreateAndGetTopicProperties() throws Exception {
         Assert.assertEquals(properties22.get("key2"), "value2");
     }
 
+    @Test
+    public void testUpdatePartitionedTopicProperties() throws Exception {
+        final String namespace = "prop-xyz/ns2";
+        final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties";
+        final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2";
+        admin.namespaces().createNamespace(namespace, 20);
+
+        // create partitioned topic without properties
+        admin.topics().createPartitionedTopic(topicName, 2);
+        Map<String, String> properties = admin.topics().getProperties(topicName);
+        Assert.assertNull(properties);
+        Map<String, String> topicProperties = new HashMap<>();
+        topicProperties.put("key1", "value1");
+        admin.topics().updateProperties(topicName, topicProperties);
+        properties = admin.topics().getProperties(topicName);
+        Assert.assertNotNull(properties);
+        Assert.assertEquals(properties.get("key1"), "value1");
+
+        // update with new key, old properties should keep
+        topicProperties = new HashMap<>();
+        topicProperties.put("key2", "value2");
+        admin.topics().updateProperties(topicName, topicProperties);
+        properties = admin.topics().getProperties(topicName);
+        Assert.assertNotNull(properties);
+        Assert.assertEquals(properties.size(), 2);
+        Assert.assertEquals(properties.get("key1"), "value1");
+        Assert.assertEquals(properties.get("key2"), "value2");
+
+        // override old values
+        topicProperties = new HashMap<>();
+        topicProperties.put("key1", "value11");
+        admin.topics().updateProperties(topicName, topicProperties);
+        properties = admin.topics().getProperties(topicName);
+        Assert.assertNotNull(properties);
+        Assert.assertEquals(properties.size(), 2);
+        Assert.assertEquals(properties.get("key1"), "value11");
+        Assert.assertEquals(properties.get("key2"), "value2");
+
+        // create topic without properties
+        admin.topics().createPartitionedTopic(topicNameTwo, 2);
+        properties = admin.topics().getProperties(topicNameTwo);
+        Assert.assertNull(properties);
+        // remove key of properties on this topic
+        admin.topics().removeProperties(topicNameTwo, "key1");
+        properties = admin.topics().getProperties(topicNameTwo);
+        Assert.assertNull(properties);
+        Map<String, String> topicProp = new HashMap<>();
+        topicProp.put("key1", "value1");
+        topicProp.put("key2", "value2");
+        admin.topics().updateProperties(topicNameTwo, topicProp);
+        properties = admin.topics().getProperties(topicNameTwo);
+        Assert.assertEquals(properties, topicProp);
+        admin.topics().removeProperties(topicNameTwo, "key1");
+        topicProp.remove("key1");
+        properties = admin.topics().getProperties(topicNameTwo);
+        Assert.assertEquals(properties, topicProp);
+    }
+
+    @Test
+    public void testUpdateNonPartitionedTopicProperties() throws Exception {
+        final String namespace = "prop-xyz/ns2";
+        final String topicName = "persistent://" + namespace + "/testUpdateNonPartitionedTopicProperties";
+        admin.namespaces().createNamespace(namespace, 20);
+
+        // create non-partitioned topic with properties
+        Map<String, String> topicProperties = new HashMap<>();
+        topicProperties.put("key1", "value1");
+        admin.topics().createNonPartitionedTopic(topicName, topicProperties);
+        Map<String, String> properties = admin.topics().getProperties(topicName);
+        Assert.assertNotNull(properties);
+        Assert.assertEquals(properties.get("key1"), "value1");
+
+        // update with new key, old properties should keep
+        topicProperties = new HashMap<>();
+        topicProperties.put("key2", "value2");
+        admin.topics().updateProperties(topicName, topicProperties);
+        properties = admin.topics().getProperties(topicName);
+        Assert.assertNotNull(properties);
+        Assert.assertEquals(properties.size(), 2);
+        Assert.assertEquals(properties.get("key1"), "value1");
+        Assert.assertEquals(properties.get("key2"), "value2");
+
+        // override old values
+        topicProperties = new HashMap<>();
+        topicProperties.put("key1", "value11");
+        admin.topics().updateProperties(topicName, topicProperties);
+        properties = admin.topics().getProperties(topicName);
+        Assert.assertNotNull(properties);
+        Assert.assertEquals(properties.size(), 2);
+        Assert.assertEquals(properties.get("key1"), "value11");
+        Assert.assertEquals(properties.get("key2"), "value2");
+    }
+
     @Test
     public void testNonPersistentTopics() throws Exception {
         final String namespace = "prop-xyz/ns2";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 376391e6db50f..201530e438254 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -32,6 +32,7 @@
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -184,6 +185,19 @@ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Excep
     }
 
 
+    @Test
+    public void updatePropertiesForAutoCreatedTopicTest() throws Exception {
+        TopicName topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                NamespaceName.get(myNamespace),
+                "test-" + UUID.randomUUID()
+        );
+        String testTopic = topicName.toString();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(testTopic).create();
+        HashMap<String, String> properties = new HashMap<>();
+        properties.put("backlogQuotaType", "message_age");
+        admin.topics().updateProperties(testTopic, properties);
+    }
     @Test
     public void testSetSizeBasedBacklogQuota() throws Exception {
 
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 73f9a199a1b12..be706d048d686 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -679,6 +679,42 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal
      */
     CompletableFuture<Map<String, String>> getPropertiesAsync(String topic);
 
+    /**
+     * Update Topic Properties on a topic.
+     * The new properties will override the existing values, old properties in the topic will be keep if not override.
+     * @param topic
+     * @param properties
+     * @throws PulsarAdminException
+     */
+    void updateProperties(String topic, Map<String, String> properties) throws PulsarAdminException;
+
+    /**
+     * Update Topic Properties on a topic.
+     * The new properties will override the existing values, old properties in the topic will be keep if not override.
+     * @param topic
+     * @param properties
+     * @return
+     */
+    CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties);
+
+    /**
+     * Remove the key in properties on a topic.
+     *
+     * @param topic
+     * @param key
+     * @throws PulsarAdminException
+     */
+    void removeProperties(String topic, String key) throws PulsarAdminException;
+
+    /**
+     * Remove the key in properties on a topic asynchronously.
+     *
+     * @param topic
+     * @param key
+     * @return
+     */
+    CompletableFuture<Void> removePropertiesAsync(String topic, String key);
+
     /**
      * Delete a partitioned topic.
      * <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index dbdaffd9e5c5f..b4be27e0770a4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -474,11 +474,39 @@ public void failed(Throwable throwable) {
         return future;
     }
 
+    @Override
+    public void updateProperties(String topic, Map<String, String> properties) throws PulsarAdminException {
+        sync(() -> updatePropertiesAsync(topic, properties));
+    }
+
+    @Override
+    public CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "properties");
+        if (properties == null) {
+            properties = new HashMap<>();
+        }
+        return asyncPutRequest(path, Entity.entity(properties, MediaType.APPLICATION_JSON));
+    }
+
     @Override
     public void deletePartitionedTopic(String topic) throws PulsarAdminException {
         deletePartitionedTopic(topic, false);
     }
 
+    @Override
+    public void removeProperties(String topic, String key) throws PulsarAdminException {
+        sync(() -> removePropertiesAsync(topic, key));
+    }
+
+    @Override
+    public CompletableFuture<Void> removePropertiesAsync(String topic, String key) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "properties")
+                .queryParam("key", key);
+        return asyncDeleteRequest(path);
+    }
+
     @Override
     public CompletableFuture<Void> deletePartitionedTopicAsync(String topic) {
         return deletePartitionedTopicAsync(topic, false);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 8e7e339b536cc..ca4ba39a61b66 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1422,6 +1422,17 @@ public void topics() throws Exception {
         cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear"));
         verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>());
 
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("update-properties persistent://myprop/clust/ns1/ds1 --property a=b -p x=y,z"));
+        props = new HashMap<>();
+        props.put("a", "b");
+        props.put("x", "y,z");
+        verify(mockTopics).updateProperties("persistent://myprop/clust/ns1/ds1", props);
+
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("remove-properties persistent://myprop/clust/ns1/ds1 --key a"));
+        verify(mockTopics).removeProperties("persistent://myprop/clust/ns1/ds1", "a");
+
         cmdTopics = new CmdTopics(() -> admin);
         props = new HashMap<>();
         props.put("a", "b");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 056847c7583a1..eaa907726997d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -56,6 +56,7 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.cli.NoSplitter;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
@@ -119,6 +120,8 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
         jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd());
         jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());
         jcommander.addCommand("get-properties", new GetPropertiesCmd());
+        jcommander.addCommand("update-properties", new UpdateProperties());
+        jcommander.addCommand("remove-properties", new RemoveProperties());
 
         jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd());
         jcommander.addCommand("peek-messages", new PeekMessages());
@@ -605,6 +608,41 @@ void run() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Update the properties of on a topic")
+    private class UpdateProperties extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)",
+            required = false, splitter = NoSplitter.class)
+        private java.util.List<String> properties;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            Map<String, String> map = parseListKeyValueMap(properties);
+            if (map == null) {
+                map = Collections.emptyMap();
+            }
+            getTopics().updateProperties(topic, map);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove the key in properties of a topic")
+    private class RemoveProperties extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--key", "-k"}, description = "The key to remove in the properties of topic")
+        private String key;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            getTopics().removeProperties(topic, key);
+        }
+    }
+
     @Parameters(commandDescription = "Delete a partitioned topic. "
             + "It will also delete all the partitions of the topic if it exists.")
     private class DeletePartitionedCmd extends CliCommand {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
index d4de706e607b7..0184e0efb82d7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
@@ -50,6 +50,7 @@ public enum TopicOperation {
 
     GET_STATS,
     GET_METADATA,
+    DELETE_METADATA,
     GET_BACKLOG_SIZE,
 
     SET_REPLICATED_SUBSCRIPTION_STATUS,