diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar.adoc index ce8e730a..1a0dffe7 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar.adoc @@ -184,10 +184,10 @@ Likewise, when the `topics` are not directly provided, a < message) { + GenericRecord record = message.getValue(); + record.getFields().forEach((f) -> + System.out.printf("%s = %s%n", f.getName(), record.getField(f))); +} +---- + +TIP: The `GenericRecord` API allows access to the fields and their associated values + ==== Customizing the ConsumerBuilder You can customize any fields available through `ConsumerBuilder` using a `PulsarListenerConsumerBuilderCustomizer` by providing a `@Bean` of type `PulsarListenerConsumerBuilderCustomizer` and then making it available to the `PulsarListener` as shown below. diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc index 01a6c3b3..acf701d6 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc @@ -164,10 +164,11 @@ Likewise, when the `subscriptionName` is not provided on the `@ReactivePulsarLis In the `ReactivePulsarListener` method shown earlier, we receive the data as `String`, but we do not specify any schema types. Internally, the framework relies on Pulsar's schema mechanism to convert the data to the required type. -The framework detects that you expect the `String` type and then infers the schema type based on that information. -Then it provides that schema to the consumer. -For all the primitive types in Java, the framework does this inference. -For any complex types (such as JSON, AVRO, and others), the framework cannot do this inference and the user needs to provide the schema type on the annotation using the `schemaType` property. + +The framework detects that you expect the `String` type and then infers the schema type based on that information and provides that schema to the consumer. +The framework does this inference for all primitive types. +For all non-primitive types the default schema is assumed to be JSON. +If a complex type is using anything besides JSON (such as AVRO or KEY_VALUE) you must provide the schema type on the annotation using the `schemaType` property. This example shows how we can consume complex types from a topic: [source, java] @@ -179,9 +180,6 @@ Mono listen(Foo message) { } ---- -Note the addition of a `schemaType` property on `ReactivePulsarListener`. -That is because the library is not capable of inferring the schema type from the provided type: `Foo`. We must tell the framework what schema to use. - Let us look at a few more ways we can consume. This example consumes the Pulsar message directly: @@ -250,6 +248,25 @@ TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is i TIP: The `spring.pulsar.consumer.subscription-type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default. +==== Generic records with AUTO_CONSUME +If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the `AUTO_CONSUME` schema type to consume generic records. +In this case, the topic deserializes messages into `GenericRecord` objects using the schema info associated with the topic. + +To consume generic records set the `schemaType = SchemaType.AUTO_CONSUME` on your `@ReactivePulsarListener` and use a Pulsar message of type `GenericRecord` as the message parameter as shown below. + +[source, java] +---- +@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME) +Mono listen(org.apache.pulsar.client.api.Message message) { + GenericRecord record = message.getValue(); + record.getFields().forEach((f) -> + System.out.printf("%s = %s%n", f.getName(), record.getField(f))); + return Mono.empty(); +} +---- + +TIP: The `GenericRecord` API allows access to the fields and their associated values + [[reactive-consumer-customizer]] ==== Consumer Customization diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-listener.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-listener.adoc index c2e83517..bf8e7f7e 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-listener.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-listener.adoc @@ -2,7 +2,7 @@ As indicated earlier, for Java primitives, the Spring for Apache Pulsar framework can infer the proper Schema to use on the `{listener-class}`. For non-primitive types, if the Schema is not explicitly specified on the annotation, the Spring for Apache Pulsar framework will try to build a `Schema.JSON` from the type. -IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding. +IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_CONSUME, KEY_VALUE w/ INLINE encoding. === Custom Schema Mapping As an alternative to specifying the schema on the `{listener-class}` for complex types, the schema resolver can be configured with mappings for the types.