Skip to content

Commit 13e35f7

Browse files
yittgsijie
authored andcommitted
[PIP-43] Support producer to send msg with different schema (#5165)
Master Issue: #5141 ### Motivation Implement part-1 of [PIP-43](https://github.com/apache/pulsar/wiki/PIP-43%3A-producer-send-message-with-different-schema#changespart-1). ### Modifications * New message api to specify message schema explicitly; * Mechanism of registering schema on producing; * Batch message container support to check message schema; * Configuration for seamless introduction of this feature;
1 parent 0281e83 commit 13e35f7

File tree

23 files changed

+2749
-789
lines changed

23 files changed

+2749
-789
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+55-17
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
8282
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
8383
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
84+
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetOrCreateSchema;
8485
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
8586
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
8687
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
@@ -104,6 +105,7 @@
104105
import org.apache.pulsar.common.protocol.schema.SchemaData;
105106
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
106107
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
108+
import org.apache.pulsar.common.schema.SchemaType;
107109
import org.apache.pulsar.common.util.FutureUtil;
108110
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
109111
import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
@@ -927,23 +929,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
927929

928930
disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
929931

930-
CompletableFuture<SchemaVersion> schemaVersionFuture;
931-
if (schema != null) {
932-
schemaVersionFuture = topic.addSchema(schema);
933-
} else {
934-
schemaVersionFuture = topic.hasSchema().thenCompose((hasSchema) -> {
935-
log.info("[{}]-{} {} configured with schema {}", remoteAddress, producerId,
936-
topicName, hasSchema);
937-
CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
938-
if (hasSchema && (schemaValidationEnforced || topic.getSchemaValidationEnforced())) {
939-
result.completeExceptionally(new IncompatibleSchemaException(
940-
"Producers cannot connect without a schema to topics with a schema"));
941-
} else {
942-
result.complete(SchemaVersion.Empty);
943-
}
944-
return result;
945-
});
946-
}
932+
CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);
947933

948934
schemaVersionFuture.exceptionally(exception -> {
949935
ctx.writeAndFlush(Commands.newError(requestId,
@@ -1375,6 +1361,58 @@ protected void handleGetSchema(CommandGetSchema commandGetSchema) {
13751361
});
13761362
}
13771363

1364+
@Override
1365+
protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCreateSchema) {
1366+
if (log.isDebugEnabled()) {
1367+
log.debug("Received CommandGetOrCreateSchema call from {}", remoteAddress);
1368+
}
1369+
long requestId = commandGetOrCreateSchema.getRequestId();
1370+
String topicName = commandGetOrCreateSchema.getTopic();
1371+
SchemaData schemaData = getSchema(commandGetOrCreateSchema.getSchema());
1372+
SchemaData schema = schemaData.getType() == SchemaType.NONE ? null : schemaData;
1373+
service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
1374+
if (topicOpt.isPresent()) {
1375+
Topic topic = topicOpt.get();
1376+
CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);
1377+
schemaVersionFuture.exceptionally(ex -> {
1378+
ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
1379+
ctx.writeAndFlush(Commands.newGetOrCreateSchemaResponseError(
1380+
requestId, errorCode, ex.getMessage()));
1381+
return null;
1382+
}).thenAccept(schemaVersion -> {
1383+
ctx.writeAndFlush(Commands.newGetOrCreateSchemaResponse(
1384+
requestId, schemaVersion));
1385+
});
1386+
} else {
1387+
ctx.writeAndFlush(Commands.newGetOrCreateSchemaResponseError(
1388+
requestId, ServerError.TopicNotFound, "Topic not found"));
1389+
}
1390+
}).exceptionally(ex -> {
1391+
ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
1392+
ctx.writeAndFlush(Commands.newGetOrCreateSchemaResponseError(
1393+
requestId, errorCode, ex.getMessage()));
1394+
return null;
1395+
});
1396+
}
1397+
1398+
private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
1399+
if (schema != null) {
1400+
return topic.addSchema(schema);
1401+
} else {
1402+
return topic.hasSchema().thenCompose((hasSchema) -> {
1403+
log.info("[{}] {} configured with schema {}",
1404+
remoteAddress, topic.getName(), hasSchema);
1405+
CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
1406+
if (hasSchema && (schemaValidationEnforced || topic.getSchemaValidationEnforced())) {
1407+
result.completeExceptionally(new IncompatibleSchemaException(
1408+
"Producers cannot connect or send message without a schema to topics with a schema"));
1409+
} else {
1410+
result.complete(SchemaVersion.Empty);
1411+
}
1412+
return result;
1413+
});
1414+
}
1415+
}
13781416

13791417
@Override
13801418
protected boolean isHandshakeCompleted() {

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java

+204-16
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,40 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21+
import lombok.AllArgsConstructor;
22+
import lombok.Builder;
23+
import lombok.Data;
24+
import lombok.NoArgsConstructor;
25+
2126
import static java.nio.charset.StandardCharsets.UTF_8;
2227
import static org.testng.Assert.assertEquals;
2328
import static org.testng.Assert.assertNotNull;
2429
import static org.testng.Assert.assertTrue;
2530

31+
import org.apache.avro.reflect.ReflectData;
32+
import org.apache.avro.Schema.Parser;
33+
import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
2634
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
35+
import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
2736
import org.apache.pulsar.client.api.schema.GenericRecord;
37+
import org.apache.pulsar.client.impl.ProducerBase;
38+
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
39+
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
2840
import org.apache.pulsar.common.schema.KeyValue;
2941
import org.apache.pulsar.common.schema.KeyValueEncodingType;
42+
import org.apache.pulsar.common.schema.SchemaInfo;
3043
import org.testng.Assert;
3144
import org.testng.annotations.AfterMethod;
3245
import org.testng.annotations.BeforeMethod;
3346
import org.testng.annotations.DataProvider;
3447
import org.testng.annotations.Factory;
3548
import org.testng.annotations.Test;
3649

50+
import java.io.ByteArrayInputStream;
51+
import java.util.Arrays;
52+
import java.util.List;
53+
import java.util.concurrent.TimeUnit;
54+
3755
public class SimpleSchemaTest extends ProducerConsumerBase {
3856

3957
@DataProvider(name = "batchingModes")
@@ -94,26 +112,30 @@ public void testString() throws Exception {
94112
}
95113
}
96114

115+
@Data
116+
@Builder
117+
@NoArgsConstructor
118+
@AllArgsConstructor
97119
static class V1Data {
98120
int i;
121+
}
99122

100-
V1Data() {
101-
this.i = 0;
102-
}
103-
104-
V1Data(int i) {
105-
this.i = i;
106-
}
107-
108-
@Override
109-
public int hashCode() {
110-
return i;
111-
}
123+
@Data
124+
@Builder
125+
@NoArgsConstructor
126+
@AllArgsConstructor
127+
static class V2Data {
128+
int i;
129+
Integer j;
130+
}
112131

113-
@Override
114-
public boolean equals(Object other) {
115-
return (other instanceof V1Data) && i == ((V1Data)other).i;
116-
}
132+
@Data
133+
@Builder
134+
@NoArgsConstructor
135+
@AllArgsConstructor
136+
static class IncompatibleData {
137+
int i;
138+
int j;
117139
}
118140

119141
@Test
@@ -185,6 +207,172 @@ public void newProducerWithoutSchemaOnTopicWithSchema() throws Exception {
185207
}
186208
}
187209

210+
@Test
211+
public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Exception {
212+
String topic = "my-property/my-ns/schema-test";
213+
Schema<V1Data> v1Schema = Schema.AVRO(V1Data.class);
214+
byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema();
215+
AvroWriter<V1Data> v1Writer = new AvroWriter<>(
216+
new Parser().parse(new ByteArrayInputStream(v1SchemaBytes)));
217+
Schema<V2Data> v2Schema = Schema.AVRO(V2Data.class);
218+
byte[] v2SchemaBytes = v2Schema.getSchemaInfo().getSchema();
219+
AvroWriter<V2Data> v2Writer = new AvroWriter<>(
220+
new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)));
221+
try (Producer<V1Data> ignored = pulsarClient.newProducer(v1Schema)
222+
.topic(topic).create()) {
223+
}
224+
try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class))
225+
.topic(topic).create()) {
226+
p.send(new V2Data(-1, -1));
227+
}
228+
V1Data dataV1 = new V1Data(2);
229+
V2Data dataV2 = new V2Data(3, 5);
230+
byte[] contentV1 = v1Writer.write(dataV1);
231+
byte[] contentV2 = v2Writer.write(dataV2);
232+
try (Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
233+
.topic(topic).create();
234+
Consumer<V2Data> c = pulsarClient.newConsumer(v2Schema)
235+
.topic(topic)
236+
.subscriptionName("sub1").subscribe()) {
237+
Assert.expectThrows(SchemaSerializationException.class, () -> p.send(contentV1));
238+
239+
((ProducerBase<byte[]>)p).newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
240+
.value(contentV1).send();
241+
p.send(contentV2);
242+
Message<V2Data> msg1 = c.receive();
243+
V2Data msg1Value = msg1.getValue();
244+
Assert.assertEquals(dataV1.i, msg1Value.i);
245+
Assert.assertNull(msg1Value.j);
246+
Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());
247+
248+
Message<V2Data> msg2 = c.receive();
249+
Assert.assertEquals(dataV2, msg2.getValue());
250+
Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes());
251+
252+
try {
253+
((ProducerBase<byte[]>)p).newMessage(Schema.BYTES).value(contentV1).send();
254+
if (schemaValidationEnforced) {
255+
Assert.fail("Shouldn't be able to send to a schema'd topic with no schema"
256+
+ " if SchemaValidationEnabled is enabled");
257+
}
258+
Message<V2Data> msg3 = c.receive();
259+
Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
260+
} catch (PulsarClientException e) {
261+
if (schemaValidationEnforced) {
262+
Assert.assertTrue(e instanceof IncompatibleSchemaException);
263+
} else {
264+
Assert.fail("Shouldn't throw IncompatibleSchemaException"
265+
+ " if SchemaValidationEnforced is disabled");
266+
}
267+
}
268+
}
269+
}
270+
271+
@Test
272+
public void newProducerForMessageSchemaOnTopicInitialWithNoSchema() throws Exception {
273+
String topic = "my-property/my-ns/schema-test";
274+
Schema<V1Data> v1Schema = Schema.AVRO(V1Data.class);
275+
byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema();
276+
AvroWriter<V1Data> v1Writer = new AvroWriter<>(
277+
new Parser().parse(new ByteArrayInputStream(v1SchemaBytes)));
278+
Schema<V2Data> v2Schema = Schema.AVRO(V2Data.class);
279+
byte[] v2SchemaBytes = v2Schema.getSchemaInfo().getSchema();
280+
AvroWriter<V2Data> v2Writer = new AvroWriter<>(
281+
new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)));
282+
try (Producer<byte[]> p = pulsarClient.newProducer()
283+
.topic(topic).create();
284+
Consumer<byte[]> c = pulsarClient.newConsumer()
285+
.topic(topic)
286+
.subscriptionName("sub1").subscribe()) {
287+
for (int i = 0; i < 2; ++i) {
288+
V1Data dataV1 = new V1Data(i);
289+
V2Data dataV2 = new V2Data(i, -i);
290+
byte[] contentV1 = v1Writer.write(dataV1);
291+
byte[] contentV2 = v2Writer.write(dataV2);
292+
((ProducerBase<byte[]>) p).newMessage(Schema.AUTO_PRODUCE_BYTES(v1Schema))
293+
.value(contentV1).send();
294+
Message<byte[]> msg1 = c.receive();
295+
Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());
296+
Assert.assertEquals(msg1.getData(), contentV1);
297+
((ProducerBase<byte[]>) p).newMessage(Schema.AUTO_PRODUCE_BYTES(v2Schema))
298+
.value(contentV2).send();
299+
Message<byte[]> msg2 = c.receive();
300+
Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes());
301+
Assert.assertEquals(msg2.getData(), contentV2);
302+
}
303+
}
304+
305+
List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
306+
Assert.assertEquals(allSchemas, Arrays.asList(v1Schema.getSchemaInfo(),
307+
v2Schema.getSchemaInfo()));
308+
}
309+
310+
@Test
311+
public void newProducerForMessageSchemaWithBatch() throws Exception {
312+
String topic = "my-property/my-ns/schema-test";
313+
Consumer<V2Data> c = pulsarClient.newConsumer(Schema.AVRO(V2Data.class))
314+
.topic(topic)
315+
.subscriptionName("sub1").subscribe();
316+
Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
317+
.topic(topic)
318+
.enableBatching(true)
319+
.batchingMaxPublishDelay(10, TimeUnit.SECONDS).create();
320+
AvroWriter<V1Data> v1DataAvroWriter = new AvroWriter<>(
321+
ReflectData.AllowNull.get().getSchema(V1Data.class));
322+
AvroWriter<V2Data> v2DataAvroWriter = new AvroWriter<>(
323+
ReflectData.AllowNull.get().getSchema(V2Data.class));
324+
AvroWriter<IncompatibleData> incompatibleDataAvroWriter = new AvroWriter<>(
325+
ReflectData.AllowNull.get().getSchema(IncompatibleData.class));
326+
int total = 20;
327+
int batch = 5;
328+
int incompatible = 3;
329+
for (int i = 0; i < total; ++i) {
330+
if (i / batch % 2 == 0) {
331+
byte[] content = v1DataAvroWriter.write(new V1Data(i));
332+
((ProducerBase<byte[]>)p).newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
333+
.value(content).sendAsync();
334+
} else {
335+
byte[] content = v2DataAvroWriter.write(new V2Data(i, i + total));
336+
((ProducerBase<byte[]>)p).newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V2Data.class)))
337+
.value(content).sendAsync();
338+
}
339+
if ((i + 1) % incompatible == 0) {
340+
byte[] content = incompatibleDataAvroWriter.write(new IncompatibleData(-i, -i));
341+
try {
342+
((ProducerBase<byte[]>)p).newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(IncompatibleData.class)))
343+
.value(content).send();
344+
} catch (Exception e) {
345+
Assert.assertTrue(e instanceof IncompatibleSchemaException, e.getMessage());
346+
}
347+
}
348+
}
349+
p.flush();
350+
for (int i = 0; i < total; ++i) {
351+
V2Data value = c.receive().getValue();
352+
if (i / batch % 2 == 0) {
353+
Assert.assertNull(value.j);
354+
Assert.assertEquals(value.i, i);
355+
} else {
356+
Assert.assertEquals(value, new V2Data(i, i + total));
357+
}
358+
}
359+
c.close();
360+
}
361+
362+
@Test
363+
public void newProducerWithMultipleSchemaDisabled() throws Exception {
364+
String topic = "my-property/my-ns/schema-test";
365+
AvroWriter<V1Data> v1DataAvroWriter = new AvroWriter<>(
366+
ReflectData.AllowNull.get().getSchema(V1Data.class));
367+
try (Producer<byte[]> p = pulsarClient.newProducer()
368+
.topic(topic)
369+
.enableMultiSchema(false).create()) {
370+
Assert.assertThrows(InvalidMessageException.class,
371+
() -> ((ProducerBase<byte[]>)p).newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
372+
.value(v1DataAvroWriter.write(new V1Data(0))).send());
373+
}
374+
}
375+
188376
@Test
189377
public void newConsumerWithSchemaOnNewTopic() throws Exception {
190378
String topic = "my-property/my-ns/schema-test";

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java

+15
Original file line numberDiff line numberDiff line change
@@ -431,4 +431,19 @@ public interface ProducerBuilder<T> extends Cloneable {
431431
* @return the producer builder instance
432432
*/
433433
ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
434+
435+
/**
436+
* Control whether enable the multiple schema mode for producer.
437+
* If enabled, producer can send a message with different schema from that specified just when it is created,
438+
* otherwise a invalid message exception would be threw
439+
* if the producer want to send a message with different schema.
440+
*
441+
* <p>Enabled by default.
442+
*
443+
* @param multiSchema
444+
* indicates to enable or disable multiple schema mode
445+
* @return the producer builder instance
446+
* @since 2.5.0
447+
*/
448+
ProducerBuilder<T> enableMultiSchema(boolean multiSchema);
434449
}

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java

+12
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,18 @@ static Schema<byte[]> AUTO_PRODUCE_BYTES() {
333333
return DefaultImplementation.newAutoProduceSchema();
334334
}
335335

336+
/**
337+
* Create a schema instance that accepts a serialized payload
338+
* and validates it against the schema specified.
339+
*
340+
* @return the auto schema instance
341+
* @since 2.5.0
342+
* @see #AUTO_PRODUCE_BYTES()
343+
*/
344+
static Schema<byte[]> AUTO_PRODUCE_BYTES(Schema<?> schema) {
345+
return DefaultImplementation.newAutoProduceSchema(schema);
346+
}
347+
336348
// CHECKSTYLE.ON: MethodName
337349

338350
static Schema<?> getSchema(SchemaInfo schemaInfo) {

0 commit comments

Comments
 (0)