Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AUTO_CONSUME schema support #559

Merged
merged 3 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ Likewise, when the `topics` are not directly provided, a <<topic-resolution-proc

In the `PulsarListener` method shown earlier, we receive the data as `String`, but we do not specify any schema types.
Internally, the framework relies on Pulsar's schema mechanism to convert the data to the required type.
The framework detects that you expect the `String` type and then infers the schema type based on that information.
Then it provides that schema to the consumer.
For all the primitive types in Java, the framework does this inference.
For any complex types (such as JSON, AVRO, and others), the framework cannot do this inference and the user needs to provide the schema type on the annotation using the `schemaType` property.
The framework detects that you expect the `String` type and then infers the schema type based on that information and provides that schema to the consumer.
The framework does this inference for all primitive types.
For all non-primitive types the default schema is assumed to be JSON.
If a complex type is using anything besides JSON (such as AVRO or KEY_VALUE) you must provide the schema type on the annotation using the `schemaType` property.

The following example shows another `PulsarListener` method, which takes an `Integer`:

Expand All @@ -209,9 +209,6 @@ public void listen(Foo message) {
}
----

Note the addition of a `schemaType` property on `PulsarListener`.
That is because the library is not capable of inferring the schema type from the provided type: `Foo`. We must tell the framework what schema to use.

Let us look at a few more ways.

You can consume the Pulsar message directly:
Expand Down Expand Up @@ -250,7 +247,7 @@ Note that, in this example, we receive the records as a collection (`List`) of o
In addition, to enable batch consumption at the `PulsarListener` level, you need to set the `batch` property on the annotation to `true`.

Based on the actual type that the `List` holds, the framework tries to infer the schema to use.
If the `List` contains a complex type, you still need to provide the `schemaType` on `PulsarListener`.
If the `List` contains a complex type besides JSON, you still need to provide the `schemaType` on `PulsarListener`.

The following uses the `Message` envelope provided by the Pulsar Java client:

Expand Down Expand Up @@ -299,6 +296,24 @@ void listen(String message) {

TIP: The properties used are direct Pulsar consumer properties, not the `spring.pulsar.consumer` application configuration properties

==== Generic records with AUTO_CONSUME
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the `AUTO_CONSUME` schema type to consume generic records.
In this case, the topic deserializes messages into `GenericRecord` objects using the schema info associated with the topic.

To consume generic records set the `schemaType = SchemaType.AUTO_CONSUME` on your `@PulsarListener` and use a Pulsar message of type `GenericRecord` as the message parameter as shown below.

[source, java]
----
@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
----

TIP: The `GenericRecord` API allows access to the fields and their associated values

==== Customizing the ConsumerBuilder

You can customize any fields available through `ConsumerBuilder` using a `PulsarListenerConsumerBuilderCustomizer` by providing a `@Bean` of type `PulsarListenerConsumerBuilderCustomizer` and then making it available to the `PulsarListener` as shown below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ Likewise, when the `subscriptionName` is not provided on the `@ReactivePulsarLis

In the `ReactivePulsarListener` method shown earlier, we receive the data as `String`, but we do not specify any schema types.
Internally, the framework relies on Pulsar's schema mechanism to convert the data to the required type.
The framework detects that you expect the `String` type and then infers the schema type based on that information.
Then it provides that schema to the consumer.
For all the primitive types in Java, the framework does this inference.
For any complex types (such as JSON, AVRO, and others), the framework cannot do this inference and the user needs to provide the schema type on the annotation using the `schemaType` property.

The framework detects that you expect the `String` type and then infers the schema type based on that information and provides that schema to the consumer.
The framework does this inference for all primitive types.
For all non-primitive types the default schema is assumed to be JSON.
If a complex type is using anything besides JSON (such as AVRO or KEY_VALUE) you must provide the schema type on the annotation using the `schemaType` property.

This example shows how we can consume complex types from a topic:
[source, java]
Expand All @@ -179,9 +180,6 @@ Mono<Void> listen(Foo message) {
}
----

Note the addition of a `schemaType` property on `ReactivePulsarListener`.
That is because the library is not capable of inferring the schema type from the provided type: `Foo`. We must tell the framework what schema to use.

Let us look at a few more ways we can consume.

This example consumes the Pulsar message directly:
Expand Down Expand Up @@ -250,6 +248,25 @@ TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is i

TIP: The `spring.pulsar.consumer.subscription-type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default.

==== Generic records with AUTO_CONSUME
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the `AUTO_CONSUME` schema type to consume generic records.
In this case, the topic deserializes messages into `GenericRecord` objects using the schema info associated with the topic.

To consume generic records set the `schemaType = SchemaType.AUTO_CONSUME` on your `@ReactivePulsarListener` and use a Pulsar message of type `GenericRecord` as the message parameter as shown below.

[source, java]
----
@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
return Mono.empty();
}
----

TIP: The `GenericRecord` API allows access to the fields and their associated values


[[reactive-consumer-customizer]]
==== Consumer Customization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
As indicated earlier, for Java primitives, the Spring for Apache Pulsar framework can infer the proper Schema to use on the `{listener-class}`.
For non-primitive types, if the Schema is not explicitly specified on the annotation, the Spring for Apache Pulsar framework will try to build a `Schema.JSON` from the type.

IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding.
IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_CONSUME, KEY_VALUE w/ INLINE encoding.

=== Custom Schema Mapping
As an alternative to specifying the schema on the `{listener-class}` for complex types, the schema resolver can be configured with mappings for the types.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,11 +26,11 @@
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.log.LogAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.SmartMessageConverter;
Expand Down Expand Up @@ -65,6 +65,8 @@
*/
public class MethodReactivePulsarListenerEndpoint<V> extends AbstractReactivePulsarListenerEndpoint<V> {

private final LogAccessor logger = new LogAccessor(this.getClass());

private Object bean;

private Method method;
Expand Down Expand Up @@ -134,18 +136,29 @@ protected AbstractPulsarMessageToSpringMessageAdapter<V> createMessageHandler(
messageParameter = parameter.get();
}

DefaultReactivePulsarMessageListenerContainer<?> containerInstance = (DefaultReactivePulsarMessageListenerContainer<?>) container;
ReactivePulsarContainerProperties<?> pulsarContainerProperties = containerInstance.getContainerProperties();
DefaultReactivePulsarMessageListenerContainer<Object> containerInstance = (DefaultReactivePulsarMessageListenerContainer<Object>) container;
ReactivePulsarContainerProperties<Object> pulsarContainerProperties = containerInstance
.getContainerProperties();

// Resolve the schema using the reader schema type
SchemaResolver schemaResolver = pulsarContainerProperties.getSchemaResolver();
SchemaType schemaType = pulsarContainerProperties.getSchemaType();
ResolvableType messageType = resolvableType(messageParameter);
schemaResolver.resolveSchema(schemaType, messageType)
.ifResolved(schema -> pulsarContainerProperties.setSchema((Schema) schema));

// Make sure the schemaType is updated to match the current schema
.ifResolvedOrElse(pulsarContainerProperties::setSchema,
(ex) -> this.logger
.warn(() -> "Failed to resolve schema for type %s - will default to BYTES (due to: %s)"
.formatted(schemaType, ex.getMessage())));

// Attempt to make sure the schemaType is updated to match the resolved schema.
// This can occur when the resolver returns a schema that is not necessarily of
// the same type as the input scheme type (e.g. SchemaType.NONE uses the message
// type to determine the schema.
if (pulsarContainerProperties.getSchema() != null) {
SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType();
pulsarContainerProperties.setSchemaType(type);
var schemaInfo = pulsarContainerProperties.getSchema().getSchemaInfo();
if (schemaInfo != null) {
pulsarContainerProperties.setSchemaType(schemaInfo.getType());
}
}

// If no topic info is set on endpoint attempt to resolve via message type
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.reactive.listener;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.junit.jupiter.api.Test;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerAutoConsumeSchemaTests.ReactivePulsarListenerAutoConsumeSchemaTestsConfig;
import org.springframework.pulsar.test.support.model.UserPojo;
import org.springframework.pulsar.test.support.model.UserRecord;
import org.springframework.test.context.ContextConfiguration;

import reactor.core.publisher.Mono;

/**
* Tests for {@link ReactivePulsarListener @ReactivePulsarListener} using
* {@code schemaType} of {@link SchemaType#AUTO_CONSUME}.
*
* @author Chris Bono
*/
@ContextConfiguration(classes = ReactivePulsarListenerAutoConsumeSchemaTestsConfig.class)
class ReactivePulsarListenerAutoConsumeSchemaTests extends ReactivePulsarListenerTestsBase {

static final String STRING_TOPIC = "placst-str-topic";
static CountDownLatch stringLatch = new CountDownLatch(3);
static List<String> stringMessages = new ArrayList<>();

static final String JSON_TOPIC = "placst-json-topic";
static CountDownLatch jsonLatch = new CountDownLatch(3);
static List<Map<String, Object>> jsonMessages = new ArrayList<>();

static final String AVRO_TOPIC = "placst-avro-topic";
static CountDownLatch avroLatch = new CountDownLatch(3);
static List<Map<String, Object>> avroMessages = new ArrayList<>();

static final String KEYVALUE_TOPIC = "placst-kv-topic";
static CountDownLatch keyValueLatch = new CountDownLatch(3);
static List<Map<String, Object>> keyValueMessages = new ArrayList<>();

@Test
void stringSchema() throws Exception {
var pulsarProducerFactory = new DefaultPulsarProducerFactory<String>(pulsarClient);
var template = new PulsarTemplate<>(pulsarProducerFactory);
var expectedMessages = new ArrayList<String>();
for (int i = 0; i < 3; i++) {
var msg = "str-" + i;
template.send(STRING_TOPIC, msg, Schema.STRING);
expectedMessages.add(msg);
}
assertThat(stringLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(stringMessages).containsExactlyInAnyOrderElementsOf(expectedMessages);
}

@Test
void jsonSchema() throws Exception {
var pulsarProducerFactory = new DefaultPulsarProducerFactory<UserRecord>(pulsarClient);
var template = new PulsarTemplate<>(pulsarProducerFactory);
var schema = JSONSchema.of(UserRecord.class);
var expectedMessages = new ArrayList<Map<String, Object>>();
for (int i = 0; i < 3; i++) {
var user = new UserRecord("Jason", i);
template.send(JSON_TOPIC, user, schema);
expectedMessages.add(Map.of("name", user.name(), "age", user.age()));
}
assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(jsonMessages).containsExactlyInAnyOrderElementsOf(expectedMessages);
}

@Test
void avroSchema() throws Exception {
var pulsarProducerFactory = new DefaultPulsarProducerFactory<UserPojo>(pulsarClient);
var template = new PulsarTemplate<>(pulsarProducerFactory);
var schema = AvroSchema.of(UserPojo.class);
var expectedMessages = new ArrayList<Map<String, Object>>();
for (int i = 0; i < 3; i++) {
var user = new UserPojo("Avi", i);
template.send(AVRO_TOPIC, user, schema);
expectedMessages.add(Map.of("name", user.getName(), "age", user.getAge()));
}
assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(avroMessages).containsExactlyInAnyOrderElementsOf(expectedMessages);
}

@Test
void keyValueSchema() throws Exception {
var pulsarProducerFactory = new DefaultPulsarProducerFactory<KeyValue<String, Integer>>(pulsarClient);
var template = new PulsarTemplate<>(pulsarProducerFactory);
var kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE);
var expectedMessages = new ArrayList<Map<String, Object>>();
for (int i = 0; i < 3; i++) {
var kv = new KeyValue<>("Kevin", i);
template.send(KEYVALUE_TOPIC, kv, kvSchema);
expectedMessages.add(Map.of(kv.getKey(), kv.getValue()));
}
assertThat(keyValueLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(keyValueMessages).containsExactlyInAnyOrderElementsOf(expectedMessages);
}

@EnablePulsar
@Configuration
static class ReactivePulsarListenerAutoConsumeSchemaTestsConfig {

@ReactivePulsarListener(id = "stringAcListener", topics = STRING_TOPIC, schemaType = SchemaType.AUTO_CONSUME,
consumerCustomizer = "earliestCustomizer")
Mono<Void> listenString(Message<GenericRecord> genericMessage) {
assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(String.class);
stringMessages.add(genericMessage.getValue().getNativeObject().toString());
stringLatch.countDown();
return Mono.empty();
}

@ReactivePulsarListener(id = "jsonAcListener", topics = JSON_TOPIC, schemaType = SchemaType.AUTO_CONSUME,
consumerCustomizer = "earliestCustomizer")
Mono<Void> listenJson(Message<GenericRecord> genericMessage) {
assertThat(genericMessage.getValue()).isInstanceOf(GenericJsonRecord.class);
GenericJsonRecord record = (GenericJsonRecord) genericMessage.getValue();
assertThat(record.getSchemaType()).isEqualTo(SchemaType.JSON);
assertThat(record).extracting("schemaInfo")
.satisfies((obj) -> assertThat(obj.toString()).contains("\"name\": \"UserRecord\""));
jsonMessages.add(record.getFields()
.stream()
.map(Field::getName)
.collect(Collectors.toMap(Function.identity(), record::getField)));
jsonLatch.countDown();
return Mono.empty();
}

@ReactivePulsarListener(id = "avroAcListener", topics = AVRO_TOPIC, schemaType = SchemaType.AUTO_CONSUME,
consumerCustomizer = "earliestCustomizer")
Mono<Void> listenAvro(Message<GenericRecord> genericMessage) {
assertThat(genericMessage.getValue()).isInstanceOf(GenericAvroRecord.class);
GenericAvroRecord record = (GenericAvroRecord) genericMessage.getValue();
assertThat(record.getSchemaType()).isEqualTo(SchemaType.AVRO);
assertThat(record).extracting("schema")
.satisfies((obj) -> assertThat(obj.toString()).contains("\"name\":\"UserPojo\""));
avroMessages.add(record.getFields()
.stream()
.map(Field::getName)
.collect(Collectors.toMap(Function.identity(), record::getField)));
avroLatch.countDown();
return Mono.empty();
}

@SuppressWarnings("unchecked")
@ReactivePulsarListener(id = "keyvalueAcListener", topics = KEYVALUE_TOPIC,
schemaType = SchemaType.AUTO_CONSUME, consumerCustomizer = "earliestCustomizer")
Mono<Void> listenKeyvalue(Message<GenericRecord> genericMessage) {
assertThat(genericMessage.getValue().getSchemaType()).isEqualTo(SchemaType.KEY_VALUE);
assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(KeyValue.class);
var record = (KeyValue<String, Object>) genericMessage.getValue().getNativeObject();
keyValueMessages.add(Map.of(record.getKey(), record.getValue()));
keyValueLatch.countDown();
return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<?> earliestCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}

}

}
Loading
Loading