Skip to content

Commit

Permalink
Add docs for AUTO_CONSUME schema support
Browse files Browse the repository at this point in the history
See #380
  • Loading branch information
onobc committed Feb 2, 2024
1 parent 92bfe7c commit efaa8ad
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ Likewise, when the `topics` are not directly provided, a <<topic-resolution-proc

In the `PulsarListener` method shown earlier, we receive the data as `String`, but we do not specify any schema types.
Internally, the framework relies on Pulsar's schema mechanism to convert the data to the required type.
The framework detects that you expect the `String` type and then infers the schema type based on that information.
Then it provides that schema to the consumer.
For all the primitive types in Java, the framework does this inference.
For any complex types (such as JSON, AVRO, and others), the framework cannot do this inference and the user needs to provide the schema type on the annotation using the `schemaType` property.
The framework detects that you expect the `String` type and then infers the schema type based on that information and provides that schema to the consumer.
The framework does this inference for all primitive types.
For all non-primitive types the default schema is assumed to be JSON.
If a complex type is using anything besides JSON (such as AVRO or KEY_VALUE) you must provide the schema type on the annotation using the `schemaType` property.

The following example shows another `PulsarListener` method, which takes an `Integer`:

Expand All @@ -209,9 +209,6 @@ public void listen(Foo message) {
}
----

Note the addition of a `schemaType` property on `PulsarListener`.
That is because the library is not capable of inferring the schema type from the provided type: `Foo`. We must tell the framework what schema to use.

Let us look at a few more ways.

You can consume the Pulsar message directly:
Expand Down Expand Up @@ -250,7 +247,7 @@ Note that, in this example, we receive the records as a collection (`List`) of o
In addition, to enable batch consumption at the `PulsarListener` level, you need to set the `batch` property on the annotation to `true`.

Based on the actual type that the `List` holds, the framework tries to infer the schema to use.
If the `List` contains a complex type, you still need to provide the `schemaType` on `PulsarListener`.
If the `List` contains a complex type besides JSON, you still need to provide the `schemaType` on `PulsarListener`.

The following uses the `Message` envelope provided by the Pulsar Java client:

Expand Down Expand Up @@ -299,6 +296,24 @@ void listen(String message) {

TIP: The properties used are direct Pulsar consumer properties, not the `spring.pulsar.consumer` application configuration properties

==== Generic records with AUTO_CONSUME
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the `AUTO_CONSUME` schema type to consume generic records.
In this case, the topic deserializes messages into `GenericRecord` objects using the schema info associated with the topic.

To consume generic records set the `schemaType = SchemaType.AUTO_CONSUME` on your `@PulsarListener` and use a Pulsar message of type `GenericRecord` as the message parameter as shown below.

[source, java]
----
@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
----

TIP: The `GenericRecord` API allows access to the fields and their associated values

==== Customizing the ConsumerBuilder

You can customize any fields available through `ConsumerBuilder` using a `PulsarListenerConsumerBuilderCustomizer` by providing a `@Bean` of type `PulsarListenerConsumerBuilderCustomizer` and then making it available to the `PulsarListener` as shown below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ Likewise, when the `subscriptionName` is not provided on the `@ReactivePulsarLis

In the `ReactivePulsarListener` method shown earlier, we receive the data as `String`, but we do not specify any schema types.
Internally, the framework relies on Pulsar's schema mechanism to convert the data to the required type.
The framework detects that you expect the `String` type and then infers the schema type based on that information.
Then it provides that schema to the consumer.
For all the primitive types in Java, the framework does this inference.
For any complex types (such as JSON, AVRO, and others), the framework cannot do this inference and the user needs to provide the schema type on the annotation using the `schemaType` property.

The framework detects that you expect the `String` type and then infers the schema type based on that information and provides that schema to the consumer.
The framework does this inference for all primitive types.
For all non-primitive types the default schema is assumed to be JSON.
If a complex type is using anything besides JSON (such as AVRO or KEY_VALUE) you must provide the schema type on the annotation using the `schemaType` property.

This example shows how we can consume complex types from a topic:
[source, java]
Expand All @@ -179,9 +180,6 @@ Mono<Void> listen(Foo message) {
}
----

Note the addition of a `schemaType` property on `ReactivePulsarListener`.
That is because the library is not capable of inferring the schema type from the provided type: `Foo`. We must tell the framework what schema to use.

Let us look at a few more ways we can consume.

This example consumes the Pulsar message directly:
Expand Down Expand Up @@ -250,6 +248,25 @@ TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is i

TIP: The `spring.pulsar.consumer.subscription-type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default.

==== Generic records with AUTO_CONSUME
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the `AUTO_CONSUME` schema type to consume generic records.
In this case, the topic deserializes messages into `GenericRecord` objects using the schema info associated with the topic.

To consume generic records set the `schemaType = SchemaType.AUTO_CONSUME` on your `@ReactivePulsarListener` and use a Pulsar message of type `GenericRecord` as the message parameter as shown below.

[source, java]
----
@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
return Mono.empty();
}
----

TIP: The `GenericRecord` API allows access to the fields and their associated values


[[reactive-consumer-customizer]]
==== Consumer Customization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
As indicated earlier, for Java primitives, the Spring for Apache Pulsar framework can infer the proper Schema to use on the `{listener-class}`.
For non-primitive types, if the Schema is not explicitly specified on the annotation, the Spring for Apache Pulsar framework will try to build a `Schema.JSON` from the type.

IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding.
IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_CONSUME, KEY_VALUE w/ INLINE encoding.

=== Custom Schema Mapping
As an alternative to specifying the schema on the `{listener-class}` for complex types, the schema resolver can be configured with mappings for the types.
Expand Down

0 comments on commit efaa8ad

Please sign in to comment.