Skip to content

Commit

Permalink
[improve][cli] Allow pulsa-client produce to specify KeyValue schema …
Browse files Browse the repository at this point in the history
…key (Avro Key support) (#20447)
  • Loading branch information
nicoloboschi authored Jun 29, 2023
1 parent 204905e commit 7bf8fd1
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.io.File;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
Expand All @@ -30,18 +33,24 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.NoArgsConstructor;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class PulsarClientToolTest extends BrokerTestBase {
Expand Down Expand Up @@ -395,4 +404,152 @@ private static String getTopicWithRandomSuffix(String localNameBase) {
return String.format("persistent://prop/ns-abc/test/%s-%s", localNameBase, UUID.randomUUID().toString());
}


@Test(timeOut = 20000)
public void testProducePartitioningKey() throws Exception {

Properties properties = initializeToolProperties();

final String topicName = getTopicWithRandomSuffix("key-topic");

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
executor.execute(() -> {
try {
PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties);
String[] args = {"produce", "-m", "test", "-k", "partition-key1", topicName};
Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
final Message<byte[]> message = consumer.receive(10, TimeUnit.SECONDS);
assertNotNull(message);
assertTrue(message.hasKey());
Assert.assertEquals(message.getKey(), "partition-key1");
}

@NoArgsConstructor
@AllArgsConstructor
public static class TestKey {
public String key_a;
public int key_b;

}

@Test
public void testProduceKeyValueSchemaInlineValue() throws Exception {

Properties properties = initializeToolProperties();

final String topicName = getTopicWithRandomSuffix("key-topic");


@Cleanup
Consumer<KeyValue<TestKey, String>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.JSON(
TestKey.class), Schema.STRING)).topic(topicName).subscriptionName("sub").subscribe();

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
final Schema<TestKey> keySchema = Schema.JSON(TestKey.class);

executor.execute(() -> {
try {
PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties);
String[] args = {"produce",
"-kvet", "inline",
"-ks", String.format("json:%s", keySchema.getSchemaInfo().getSchemaDefinition()),
"-kvk", ObjectMapperFactory.getMapper().writer().writeValueAsString(new TestKey("my-key", Integer.MAX_VALUE)),
"-vs", "string",
"-m", "test",
topicName};
Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
final Message<KeyValue<TestKey, String>> message = consumer.receive(10, TimeUnit.SECONDS);
assertNotNull(message);
assertFalse(message.hasKey());
Assert.assertEquals(message.getValue().getKey().key_a, "my-key");
Assert.assertEquals(message.getValue().getKey().key_b, Integer.MAX_VALUE);
Assert.assertEquals(message.getValue().getValue(), "test");
}

@DataProvider(name = "keyValueKeySchema")
public static Object[][] keyValueKeySchema() {
return new Object[][]{
{"json"},
{"avro"}
};
}

@Test(dataProvider = "keyValueKeySchema")
public void testProduceKeyValueSchemaFileValue(String schema) throws Exception {

Properties properties = initializeToolProperties();

final String topicName = getTopicWithRandomSuffix("key-topic");



@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
File file = Files.createTempFile("", "").toFile();
final Schema<TestKey> keySchema;
if (schema.equals("json")) {
keySchema = Schema.JSON(TestKey.class);
} else if (schema.equals("avro")) {
keySchema = Schema.AVRO(TestKey.class);
} else {
throw new IllegalStateException();
}


Files.write(file.toPath(), keySchema.encode(new TestKey("my-key", Integer.MAX_VALUE)));

@Cleanup
Consumer<KeyValue<TestKey, String>> consumer = pulsarClient.newConsumer(Schema.KeyValue(keySchema, Schema.STRING))
.topic(topicName).subscriptionName("sub").subscribe();

executor.execute(() -> {
try {
PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties);
String[] args = {"produce",
"-k", "partitioning-key",
"-kvet", "inline",
"-ks", String.format("%s:%s", schema, keySchema.getSchemaInfo().getSchemaDefinition()),
"-kvkf", file.getAbsolutePath(),
"-vs", "string",
"-m", "test",
topicName};
Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
final Message<KeyValue<TestKey, String>> message = consumer.receive(10, TimeUnit.SECONDS);
assertNotNull(message);
// -k should not be considered
assertFalse(message.hasKey());
Assert.assertEquals(message.getValue().getKey().key_a, "my-key");
Assert.assertEquals(message.getValue().getKey().key_b, Integer.MAX_VALUE);
}

private Properties initializeToolProperties() {
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");
return properties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,14 @@ public class CmdProduce {
+ "key=value string, like k1=v1,k2=v2.")
private List<String> properties = new ArrayList<>();

@Parameter(names = { "-k", "--key"}, description = "message key to add ")
@Parameter(names = { "-k", "--key"}, description = "Partitioning key to add to each message")
private String key;
@Parameter(names = { "-kvk", "--key-value-key"}, description = "Value to add as message key in KeyValue schema")
private String keyValueKey;
@Parameter(names = { "-kvkf", "--key-value-key-file"},
description = "Path to file containing the value to add as message key in KeyValue schema. "
+ "JSON and AVRO files are supported.")
private String keyValueKeyFile;

@Parameter(names = { "-vs", "--value-schema"}, description = "Schema type (can be bytes,avro,json,string...)")
private String valueSchema = "bytes";
Expand Down Expand Up @@ -268,6 +274,25 @@ private int publish(String topic) {
kvMap.put(kv[0], kv[1]);
}

final byte[] keyValueKeyBytes;
if (this.keyValueKey != null) {
if (keyValueEncodingType == KEY_VALUE_ENCODING_TYPE_NOT_SET) {
throw new ParameterException(
"Key value encoding type must be set when using --key-value-key");
}
keyValueKeyBytes = this.keyValueKey.getBytes(StandardCharsets.UTF_8);
} else if (this.keyValueKeyFile != null) {
if (keyValueEncodingType == KEY_VALUE_ENCODING_TYPE_NOT_SET) {
throw new ParameterException(
"Key value encoding type must be set when using --key-value-key-file");
}
keyValueKeyBytes = Files.readAllBytes(Paths.get(this.keyValueKeyFile));
} else if (this.key != null) {
keyValueKeyBytes = this.key.getBytes(StandardCharsets.UTF_8);
} else {
keyValueKeyBytes = null;
}

for (int i = 0; i < this.numTimesProduce; i++) {
for (byte[] content : messageBodies) {
if (limiter != null) {
Expand All @@ -290,8 +315,7 @@ private int publish(String topic) {
case KEY_VALUE_ENCODING_TYPE_SEPARATED:
case KEY_VALUE_ENCODING_TYPE_INLINE:
KeyValue kv = new KeyValue<>(
// TODO: support AVRO encoded key
key != null ? key.getBytes(StandardCharsets.UTF_8) : null,
keyValueKeyBytes,
content);
message.value(kv);
break;
Expand Down

0 comments on commit 7bf8fd1

Please sign in to comment.