diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar.adoc index ce8e730a..1a0dffe7 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar.adoc @@ -184,10 +184,10 @@ Likewise, when the `topics` are not directly provided, a < message) { + GenericRecord record = message.getValue(); + record.getFields().forEach((f) -> + System.out.printf("%s = %s%n", f.getName(), record.getField(f))); +} +---- + +TIP: The `GenericRecord` API allows access to the fields and their associated values + ==== Customizing the ConsumerBuilder You can customize any fields available through `ConsumerBuilder` using a `PulsarListenerConsumerBuilderCustomizer` by providing a `@Bean` of type `PulsarListenerConsumerBuilderCustomizer` and then making it available to the `PulsarListener` as shown below. diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc index 01a6c3b3..acf701d6 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc @@ -164,10 +164,11 @@ Likewise, when the `subscriptionName` is not provided on the `@ReactivePulsarLis In the `ReactivePulsarListener` method shown earlier, we receive the data as `String`, but we do not specify any schema types. Internally, the framework relies on Pulsar's schema mechanism to convert the data to the required type. -The framework detects that you expect the `String` type and then infers the schema type based on that information. -Then it provides that schema to the consumer. -For all the primitive types in Java, the framework does this inference. -For any complex types (such as JSON, AVRO, and others), the framework cannot do this inference and the user needs to provide the schema type on the annotation using the `schemaType` property. + +The framework detects that you expect the `String` type and then infers the schema type based on that information and provides that schema to the consumer. +The framework does this inference for all primitive types. +For all non-primitive types the default schema is assumed to be JSON. +If a complex type is using anything besides JSON (such as AVRO or KEY_VALUE) you must provide the schema type on the annotation using the `schemaType` property. This example shows how we can consume complex types from a topic: [source, java] @@ -179,9 +180,6 @@ Mono listen(Foo message) { } ---- -Note the addition of a `schemaType` property on `ReactivePulsarListener`. -That is because the library is not capable of inferring the schema type from the provided type: `Foo`. We must tell the framework what schema to use. - Let us look at a few more ways we can consume. This example consumes the Pulsar message directly: @@ -250,6 +248,25 @@ TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is i TIP: The `spring.pulsar.consumer.subscription-type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default. +==== Generic records with AUTO_CONSUME +If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the `AUTO_CONSUME` schema type to consume generic records. +In this case, the topic deserializes messages into `GenericRecord` objects using the schema info associated with the topic. + +To consume generic records set the `schemaType = SchemaType.AUTO_CONSUME` on your `@ReactivePulsarListener` and use a Pulsar message of type `GenericRecord` as the message parameter as shown below. + +[source, java] +---- +@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME) +Mono listen(org.apache.pulsar.client.api.Message message) { + GenericRecord record = message.getValue(); + record.getFields().forEach((f) -> + System.out.printf("%s = %s%n", f.getName(), record.getField(f))); + return Mono.empty(); +} +---- + +TIP: The `GenericRecord` API allows access to the fields and their associated values + [[reactive-consumer-customizer]] ==== Consumer Customization diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-listener.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-listener.adoc index c2e83517..bf8e7f7e 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-listener.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-listener.adoc @@ -2,7 +2,7 @@ As indicated earlier, for Java primitives, the Spring for Apache Pulsar framework can infer the proper Schema to use on the `{listener-class}`. For non-primitive types, if the Schema is not explicitly specified on the annotation, the Spring for Apache Pulsar framework will try to build a `Schema.JSON` from the type. -IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding. +IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_CONSUME, KEY_VALUE w/ INLINE encoding. === Custom Schema Mapping As an alternative to specifying the schema on the `{listener-class}` for complex types, the schema resolver can be configured with mappings for the types. diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java index d39386ff..585a6f40 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,11 +26,11 @@ import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaType; import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; +import org.springframework.core.log.LogAccessor; import org.springframework.expression.BeanResolver; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.SmartMessageConverter; @@ -65,6 +65,8 @@ */ public class MethodReactivePulsarListenerEndpoint extends AbstractReactivePulsarListenerEndpoint { + private final LogAccessor logger = new LogAccessor(this.getClass()); + private Object bean; private Method method; @@ -134,18 +136,29 @@ protected AbstractPulsarMessageToSpringMessageAdapter createMessageHandler( messageParameter = parameter.get(); } - DefaultReactivePulsarMessageListenerContainer containerInstance = (DefaultReactivePulsarMessageListenerContainer) container; - ReactivePulsarContainerProperties pulsarContainerProperties = containerInstance.getContainerProperties(); + DefaultReactivePulsarMessageListenerContainer containerInstance = (DefaultReactivePulsarMessageListenerContainer) container; + ReactivePulsarContainerProperties pulsarContainerProperties = containerInstance + .getContainerProperties(); + + // Resolve the schema using the reader schema type SchemaResolver schemaResolver = pulsarContainerProperties.getSchemaResolver(); SchemaType schemaType = pulsarContainerProperties.getSchemaType(); ResolvableType messageType = resolvableType(messageParameter); schemaResolver.resolveSchema(schemaType, messageType) - .ifResolved(schema -> pulsarContainerProperties.setSchema((Schema) schema)); - - // Make sure the schemaType is updated to match the current schema + .ifResolvedOrElse(pulsarContainerProperties::setSchema, + (ex) -> this.logger + .warn(() -> "Failed to resolve schema for type %s - will default to BYTES (due to: %s)" + .formatted(schemaType, ex.getMessage()))); + + // Attempt to make sure the schemaType is updated to match the resolved schema. + // This can occur when the resolver returns a schema that is not necessarily of + // the same type as the input scheme type (e.g. SchemaType.NONE uses the message + // type to determine the schema. if (pulsarContainerProperties.getSchema() != null) { - SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType(); - pulsarContainerProperties.setSchemaType(type); + var schemaInfo = pulsarContainerProperties.getSchema().getSchemaInfo(); + if (schemaInfo != null) { + pulsarContainerProperties.setSchemaType(schemaInfo.getType()); + } } // If no topic info is set on endpoint attempt to resolve via message type diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerAutoConsumeSchemaTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerAutoConsumeSchemaTests.java new file mode 100644 index 00000000..3d0f4bb1 --- /dev/null +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerAutoConsumeSchemaTests.java @@ -0,0 +1,205 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reactive.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord; +import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; +import org.junit.jupiter.api.Test; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; +import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerAutoConsumeSchemaTests.ReactivePulsarListenerAutoConsumeSchemaTestsConfig; +import org.springframework.pulsar.test.support.model.UserPojo; +import org.springframework.pulsar.test.support.model.UserRecord; +import org.springframework.test.context.ContextConfiguration; + +import reactor.core.publisher.Mono; + +/** + * Tests for {@link ReactivePulsarListener @ReactivePulsarListener} using + * {@code schemaType} of {@link SchemaType#AUTO_CONSUME}. + * + * @author Chris Bono + */ +@ContextConfiguration(classes = ReactivePulsarListenerAutoConsumeSchemaTestsConfig.class) +class ReactivePulsarListenerAutoConsumeSchemaTests extends ReactivePulsarListenerTestsBase { + + static final String STRING_TOPIC = "placst-str-topic"; + static CountDownLatch stringLatch = new CountDownLatch(3); + static List stringMessages = new ArrayList<>(); + + static final String JSON_TOPIC = "placst-json-topic"; + static CountDownLatch jsonLatch = new CountDownLatch(3); + static List> jsonMessages = new ArrayList<>(); + + static final String AVRO_TOPIC = "placst-avro-topic"; + static CountDownLatch avroLatch = new CountDownLatch(3); + static List> avroMessages = new ArrayList<>(); + + static final String KEYVALUE_TOPIC = "placst-kv-topic"; + static CountDownLatch keyValueLatch = new CountDownLatch(3); + static List> keyValueMessages = new ArrayList<>(); + + @Test + void stringSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var expectedMessages = new ArrayList(); + for (int i = 0; i < 3; i++) { + var msg = "str-" + i; + template.send(STRING_TOPIC, msg, Schema.STRING); + expectedMessages.add(msg); + } + assertThat(stringLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(stringMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void jsonSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = JSONSchema.of(UserRecord.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserRecord("Jason", i); + template.send(JSON_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.name(), "age", user.age())); + } + assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(jsonMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void avroSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = AvroSchema.of(UserPojo.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserPojo("Avi", i); + template.send(AVRO_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.getName(), "age", user.getAge())); + } + assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(avroMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void keyValueSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory>(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var kv = new KeyValue<>("Kevin", i); + template.send(KEYVALUE_TOPIC, kv, kvSchema); + expectedMessages.add(Map.of(kv.getKey(), kv.getValue())); + } + assertThat(keyValueLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(keyValueMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @EnablePulsar + @Configuration + static class ReactivePulsarListenerAutoConsumeSchemaTestsConfig { + + @ReactivePulsarListener(id = "stringAcListener", topics = STRING_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + consumerCustomizer = "earliestCustomizer") + Mono listenString(Message genericMessage) { + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(String.class); + stringMessages.add(genericMessage.getValue().getNativeObject().toString()); + stringLatch.countDown(); + return Mono.empty(); + } + + @ReactivePulsarListener(id = "jsonAcListener", topics = JSON_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + consumerCustomizer = "earliestCustomizer") + Mono listenJson(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericJsonRecord.class); + GenericJsonRecord record = (GenericJsonRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.JSON); + assertThat(record).extracting("schemaInfo") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\": \"UserRecord\"")); + jsonMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + jsonLatch.countDown(); + return Mono.empty(); + } + + @ReactivePulsarListener(id = "avroAcListener", topics = AVRO_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + consumerCustomizer = "earliestCustomizer") + Mono listenAvro(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericAvroRecord.class); + GenericAvroRecord record = (GenericAvroRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.AVRO); + assertThat(record).extracting("schema") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\":\"UserPojo\"")); + avroMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + avroLatch.countDown(); + return Mono.empty(); + } + + @SuppressWarnings("unchecked") + @ReactivePulsarListener(id = "keyvalueAcListener", topics = KEYVALUE_TOPIC, + schemaType = SchemaType.AUTO_CONSUME, consumerCustomizer = "earliestCustomizer") + Mono listenKeyvalue(Message genericMessage) { + assertThat(genericMessage.getValue().getSchemaType()).isEqualTo(SchemaType.KEY_VALUE); + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(KeyValue.class); + var record = (KeyValue) genericMessage.getValue().getNativeObject(); + keyValueMessages.add(Map.of(record.getKey(), record.getValue())); + keyValueLatch.countDown(); + return Mono.empty(); + } + + @Bean + ReactivePulsarListenerMessageConsumerBuilderCustomizer earliestCustomizer() { + return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest); + } + + } + +} diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserPojo.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserPojo.java new file mode 100644 index 00000000..f3f2d939 --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserPojo.java @@ -0,0 +1,79 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support.model; + +import java.util.Objects; + +/** + * Test object (user) defined via standard Java beans get/set methods. + *

+ * WARN Do not convert this to a Record as this is used for Avro tests and Avro + * does not work well w/ records yet. + */ +public class UserPojo { + + private String name; + + private int age; + + UserPojo() { + } + + public UserPojo(String name, int age) { + this.name = name; + this.age = age; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UserPojo user = (UserPojo) o; + return age == user.age && Objects.equals(name, user.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, age); + } + + @Override + public String toString() { + return "User{" + "name='" + name + '\'' + ", age=" + age + '}'; + } + +} diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserRecord.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserRecord.java new file mode 100644 index 00000000..2cf3c0bc --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserRecord.java @@ -0,0 +1,26 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support.model; + +/** + * Test object (user) defined via a Java record. + * + * @param name the user's name + * @param age the user's age + */ +public record UserRecord(String name, int age) { +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java index 885c0835..03fa4cd8 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; +import org.springframework.core.log.LogAccessor; import org.springframework.expression.BeanResolver; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.SmartMessageConverter; @@ -66,6 +67,8 @@ */ public class MethodPulsarListenerEndpoint extends AbstractPulsarListenerEndpoint { + private final LogAccessor logger = new LogAccessor(this.getClass()); + private Object bean; private Method method; @@ -140,15 +143,26 @@ protected AbstractPulsarMessageToSpringMessageAdapter createMessageListener( ConcurrentPulsarMessageListenerContainer containerInstance = (ConcurrentPulsarMessageListenerContainer) container; PulsarContainerProperties pulsarContainerProperties = containerInstance.getContainerProperties(); + + // Resolve the schema using the listener schema type SchemaResolver schemaResolver = pulsarContainerProperties.getSchemaResolver(); SchemaType schemaType = pulsarContainerProperties.getSchemaType(); ResolvableType messageType = resolvableType(messageParameter); - schemaResolver.resolveSchema(schemaType, messageType).ifResolved(pulsarContainerProperties::setSchema); - - // Make sure the schemaType is updated to match the current schema + schemaResolver.resolveSchema(schemaType, messageType) + .ifResolvedOrElse(pulsarContainerProperties::setSchema, + (ex) -> this.logger + .warn(() -> "Failed to resolve schema for type %s - will default to BYTES (due to: %s)" + .formatted(schemaType, ex.getMessage()))); + + // Attempt to make sure the schemaType is updated to match the resolved schema. + // This can occur when the resolver returns a schema that is not necessarily of + // the same type as the input scheme type (e.g. SchemaType.NONE uses the message + // type to determine the schema. if (pulsarContainerProperties.getSchema() != null) { - SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType(); - pulsarContainerProperties.setSchemaType(type); + var schemaInfo = pulsarContainerProperties.getSchema().getSchemaInfo(); + if (schemaInfo != null) { + pulsarContainerProperties.setSchemaType(schemaInfo.getType()); + } } // If no topic info is set on endpoint attempt to resolve via message type diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java index 46675af7..b38db417 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; +import org.springframework.core.log.LogAccessor; import org.springframework.expression.BeanResolver; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.SmartMessageConverter; @@ -56,6 +57,8 @@ */ public class MethodPulsarReaderEndpoint extends AbstractPulsarReaderEndpoint { + private final LogAccessor logger = new LogAccessor(this.getClass()); + private Object bean; private Method method; @@ -114,21 +117,30 @@ protected AbstractPulsarMessageToSpringMessageAdapter createReaderListener( DefaultPulsarMessageReaderContainer containerInstance = (DefaultPulsarMessageReaderContainer) container; PulsarReaderContainerProperties pulsarContainerProperties = containerInstance.getContainerProperties(); + + // Resolve the schema using the reader schema type SchemaResolver schemaResolver = pulsarContainerProperties.getSchemaResolver(); SchemaType schemaType = pulsarContainerProperties.getSchemaType(); ResolvableType messageType = resolvableType(messageParameter); - schemaResolver.resolveSchema(schemaType, messageType).ifResolved(pulsarContainerProperties::setSchema); - - // Make sure the schemaType is updated to match the current schema + schemaResolver.resolveSchema(schemaType, messageType) + .ifResolvedOrElse(pulsarContainerProperties::setSchema, + (ex) -> this.logger + .warn(() -> "Failed to resolve schema for type %s - will default to BYTES (due to: %s)" + .formatted(schemaType, ex.getMessage()))); + + // Attempt to make sure the schemaType is updated to match the resolved schema. + // This can occur when the resolver returns a schema that is not necessarily of + // the same type as the input scheme type (e.g. SchemaType.NONE uses the message + // type to determine the schema. if (pulsarContainerProperties.getSchema() != null) { - SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType(); - pulsarContainerProperties.setSchemaType(type); + var schemaInfo = pulsarContainerProperties.getSchema().getSchemaInfo(); + if (schemaInfo != null) { + pulsarContainerProperties.setSchemaType(schemaInfo.getType()); + } } // TODO: If no topic info is set on endpoint attempt to resolve via message type - container.setReaderCustomizer(this.readerBuilderCustomizer); - return readerListener; } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java index bdc730cb..802ffbc1 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -185,6 +185,7 @@ public Resolved> resolveSchema(SchemaType schemaType, @Nullable Re requireNonNullMessageType(schemaType, messageType); yield getMessageKeyValueSchema(messageType); } + case AUTO_CONSUME -> Schema.AUTO_CONSUME(); case NONE -> { if (messageType == null || messageType.getRawClass() == null) { yield Schema.BYTES; diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/Resolved.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/Resolved.java index 38d15cb3..134d9288 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/Resolved.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/Resolved.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ * * @param the resolved type * @author Christophe Bornet + * @author Chris Bono */ public final class Resolved { @@ -41,28 +42,96 @@ private Resolved(@Nullable T value, @Nullable RuntimeException exception) { this.exception = exception; } + /** + * Factory method to create a {@code Resolved} when resolution succeeds. + * @param value the non-{@code null} resolved value + * @param the type of the value + * @return a {@code Resolved} containing the resolved value + */ public static Resolved of(T value) { return new Resolved<>(value, null); } + /** + * Factory method to create a {@code Resolved} when resolution fails. + * @param reason the non-{@code null} reason the resolution failed + * @param the type of the value + * @return a {@code Resolved} containing an {@link IllegalArgumentException} with the + * reason for the failure + */ public static Resolved failed(String reason) { return new Resolved<>(null, new IllegalArgumentException(reason)); } - public static Resolved failed(RuntimeException e) { - return new Resolved<>(null, e); + /** + * Factory method to create a {@code Resolved} when resolution fails. + * @param reason the non-{@code null} reason the resolution failed + * @param the type of the value + * @return a {@code Resolved} containing the reason for the failure + */ + public static Resolved failed(RuntimeException reason) { + return new Resolved<>(null, reason); } + /** + * Gets the optional resolved value. + * @return an optional with the resolved value or empty if failed to resolve + * @deprecated Use {@link #value()} instead + */ + @Deprecated(since = "1.1.0", forRemoval = true) public Optional get() { + return value(); + } + + /** + * Gets the resolved value. + * @return an optional with the resolved value or empty if failed to resolve + */ + public Optional value() { return Optional.ofNullable(this.value); } + /** + * Gets the exception that may have occurred during resolution. + * @return an optional with the resolution exception or empty if no error occurred + */ + public Optional exception() { + return Optional.ofNullable(this.exception); + } + + /** + * Performs the given action with the resolved value if a value was resolved and no + * exception occurred. + * @param action the action to be performed + */ public void ifResolved(Consumer action) { - if (this.value != null) { + if (this.value != null && this.exception == null) { action.accept(this.value); } } + /** + * Performs the given action with the resolved value if a non-{@code null} value was + * resolved and no exception occurred. Otherwise, if an exception occurred then the + * provided error action is performed with the exception. + * @param action the action to be performed + * @param errorAction the error action to be performed + */ + public void ifResolvedOrElse(Consumer action, Consumer errorAction) { + if (this.value != null && this.exception == null) { + action.accept(this.value); + } + else if (this.exception != null) { + errorAction.accept(this.exception); + } + } + + /** + * Returns the resolved value if a value was resolved and no exception occurred, + * otherwise throws the resolution exception back to the caller. + * @return the resolved value if a value was resolved and no exception occurred + * @throws RuntimeException if an exception occurred during resolution + */ public T orElseThrow() { if (this.value == null && this.exception != null) { throw this.exception; @@ -70,6 +139,15 @@ public T orElseThrow() { return this.value; } + /** + * Returns the resolved value if a value was resolved and no exception occurred, + * otherwise wraps the resolution exception with the provided error message and throws + * back to the caller. + * @param wrappingErrorMessage additional context to add to the wrapped exception + * @return the resolved value if a value was resolved and no exception occurred + * @throws RuntimeException wrapping the resolution exception if an exception occurred + * during resolution + */ public T orElseThrow(Supplier wrappingErrorMessage) { if (this.value == null && this.exception != null) { throw new RuntimeException(wrappingErrorMessage.get(), this.exception); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java index 24307d17..09e6799d 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.KeyValueSchema; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.ProtobufSchema; @@ -255,6 +256,17 @@ void structSchemas() { })); } + @Test + void autoConsumeSchema() { + assertThat(resolver.resolveSchema(SchemaType.AUTO_CONSUME, ResolvableType.forType(Foo.class)).orElseThrow()) + .isInstanceOf(AutoConsumeSchema.class); + assertThat( + resolver.resolveSchema(SchemaType.AUTO_CONSUME, ResolvableType.forType(String.class)).orElseThrow()) + .isInstanceOf(AutoConsumeSchema.class); + assertThat(resolver.resolveSchema(SchemaType.AUTO_CONSUME, null).orElseThrow()) + .isInstanceOf(AutoConsumeSchema.class); + } + @ParameterizedTest @EnumSource(value = SchemaType.class, names = { "JSON", "AVRO", "PROTOBUF", "KEY_VALUE" }) void structSchemasRequireMessageType(SchemaType schemaType) { @@ -264,7 +276,7 @@ void structSchemasRequireMessageType(SchemaType schemaType) { } @ParameterizedTest - @EnumSource(value = SchemaType.class, names = { "PROTOBUF_NATIVE", "AUTO", "AUTO_CONSUME", "AUTO_PUBLISH" }) + @EnumSource(value = SchemaType.class, names = { "PROTOBUF_NATIVE", "AUTO", "AUTO_PUBLISH" }) void unsupportedSchemaTypes(SchemaType unsupportedType) { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> resolver.resolveSchema(unsupportedType, null).orElseThrow()) diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultTopicResolverTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultTopicResolverTests.java index b57eea1a..937aa829 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultTopicResolverTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultTopicResolverTests.java @@ -58,7 +58,7 @@ void addMappingsToResolver() { @MethodSource("resolveNoMessageInfoProvider") void resolveNoMessageInfo(String testName, @Nullable String userTopic, @Nullable String defaultTopic, @Nullable String expectedTopic) { - assertThat(resolver.resolveTopic(userTopic, () -> defaultTopic).get().orElse(null)).isEqualTo(expectedTopic); + assertThat(resolver.resolveTopic(userTopic, () -> defaultTopic).value().orElse(null)).isEqualTo(expectedTopic); } static Stream resolveNoMessageInfoProvider() { @@ -76,7 +76,7 @@ static Stream resolveNoMessageInfoProvider() { @MethodSource("resolveByMessageInstanceProvider") void resolveByMessageInstance(String testName, @Nullable String userTopic, T message, @Nullable String defaultTopic, @Nullable String expectedTopic) { - assertThat(resolver.resolveTopic(userTopic, message, () -> defaultTopic).get().orElse(null)) + assertThat(resolver.resolveTopic(userTopic, message, () -> defaultTopic).value().orElse(null)) .isEqualTo(expectedTopic); } @@ -99,7 +99,7 @@ static Stream resolveByMessageInstanceProvider() { @MethodSource("resolveByMessageTypeProvider") void resolveByMessageType(String testName, @Nullable String userTopic, Class messageType, @Nullable String defaultTopic, @Nullable String expectedTopic) { - assertThat(resolver.resolveTopic(userTopic, messageType, () -> defaultTopic).get().orElse(null)) + assertThat(resolver.resolveTopic(userTopic, messageType, () -> defaultTopic).value().orElse(null)) .isEqualTo(expectedTopic); } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/ResolvedTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/ResolvedTests.java index 2df149b2..1b15534c 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/ResolvedTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/ResolvedTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,15 @@ import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.assertj.core.api.Assertions.assertThatRuntimeException; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import java.util.function.Consumer; + +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; /** * Unit tests for {@link Resolved}. @@ -30,33 +37,119 @@ */ class ResolvedTests { + @SuppressWarnings("removal") @Test - void success() { - assertThat(Resolved.of("good").get()).hasValue("good"); - assertThat(Resolved.of("good").orElseThrow()).isEqualTo("good"); + void deprecatedGetDelegatesToNewValueMethod() { + Resolved resolved = Mockito.spy(Resolved.of("hello")); + resolved.get(); + verify(resolved).value(); } - @Test - void failedWithSimpleReason() { - var resolved = Resolved.failed("oops"); - assertThatIllegalArgumentException().isThrownBy(resolved::orElseThrow).withMessage("oops"); - assertThat(resolved.get()).isEmpty(); + @SuppressWarnings("unchecked") + static Consumer mockValueAction() { + return (Consumer) mock(Consumer.class); } - @Test - void failedWithReason() { - var resolved = Resolved.failed(new IllegalStateException("5150")); - assertThatIllegalStateException().isThrownBy(resolved::orElseThrow).withMessage("5150"); - assertThat(resolved.get()).isEmpty(); + @SuppressWarnings("unchecked") + static Consumer mockErrorAction() { + return (Consumer) mock(Consumer.class); } - @Test - void failedWithAdditionalMessage() { - var resolved = Resolved.failed(new IllegalStateException("5150")); - assertThatRuntimeException().isThrownBy(() -> resolved.orElseThrow(() -> "extra message")) - .withMessage("extra message") - .withCause(new IllegalStateException("5150")); - assertThat(resolved.get()).isEmpty(); + @Nested + class WhenResolutionSucceeds { + + @Test + void valueDoesReturnValue() { + var resolved = Resolved.of("smile"); + assertThat(resolved.value()).hasValue("smile"); + } + + @Test + void orElseThrowDoesReturnValue() { + var resolved = Resolved.of("smile"); + assertThat(resolved.orElseThrow()).isEqualTo("smile"); + } + + @Test + void exceptionDoesReturnEmpty() { + var resolved = Resolved.of("smile"); + assertThat(resolved.exception()).isEmpty(); + } + + @Test + void ifResolvedDoesCallValueAction() { + var resolved = Resolved.of("smile"); + var valueAction = mockValueAction(); + resolved.ifResolved(valueAction); + verify(valueAction).accept("smile"); + } + + @Test + void ifResolvedOrElseDoesCallValueActionAndIgnoresErrorAction() { + var resolved = Resolved.of("smile"); + var valueAction = mockValueAction(); + var errorAction = mockErrorAction(); + resolved.ifResolvedOrElse(valueAction, errorAction); + verify(valueAction).accept("smile"); + verifyNoInteractions(errorAction); + } + + } + + @Nested + class WhenResolutionFails { + + @Test + void valueDoesReturnEmpty() { + var resolved = Resolved.failed("5150"); + assertThat(resolved.value()).isEmpty(); + } + + @Test + void orElseThrowDoesThrowSimpleReason() { + var resolved = Resolved.failed("5150"); + assertThatIllegalArgumentException().isThrownBy(resolved::orElseThrow).withMessage("5150"); + } + + @Test + void orElseThrowDoesThrowReason() { + var resolved = Resolved.failed(new IllegalStateException("5150")); + assertThatIllegalStateException().isThrownBy(resolved::orElseThrow).withMessage("5150"); + } + + @Test + void orElseThrowDoesThrowReasonWithExtraMessage() { + var resolved = Resolved.failed(new IllegalStateException("5150")); + assertThatRuntimeException().isThrownBy(() -> resolved.orElseThrow(() -> "extra message")) + .withMessage("extra message") + .withCause(new IllegalStateException("5150")); + } + + @Test + void exceptionDoesReturnReason() { + var resolved = Resolved.failed("5150"); + assertThat(resolved.exception()).hasValueSatisfying( + (ex) -> assertThat(ex).isInstanceOf(IllegalArgumentException.class).hasMessage("5150")); + } + + @Test + void ifResolvedDoesNotCallValueAction() { + var resolved = Resolved.failed("5150"); + var valueAction = mockValueAction(); + resolved.ifResolved(valueAction); + verifyNoInteractions(valueAction); + } + + @Test + void ifResolvedOrElseDoesIgnoreValueActionAndCallsErrorAction() { + var resolved = Resolved.failed("5150"); + var valueAction = mockValueAction(); + var errorAction = mockErrorAction(); + resolved.ifResolvedOrElse(valueAction, errorAction); + verifyNoInteractions(valueAction); + verify(errorAction).accept(resolved.exception().get()); + } + } } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerAutoConsumeSchemaTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerAutoConsumeSchemaTests.java new file mode 100644 index 00000000..3dba7161 --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerAutoConsumeSchemaTests.java @@ -0,0 +1,191 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord; +import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; +import org.junit.jupiter.api.Test; + +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.listener.PulsarListenerAutoConsumeSchemaTests.PulsarListenerAutoConsumeSchemaTestsConfig; +import org.springframework.pulsar.test.support.model.UserPojo; +import org.springframework.pulsar.test.support.model.UserRecord; +import org.springframework.test.context.ContextConfiguration; + +/** + * Tests for {@link PulsarListener @PulsarListener} using {@code schemaType} of + * {@link SchemaType#AUTO_CONSUME}. + * + * @author Chris Bono + */ +@ContextConfiguration(classes = PulsarListenerAutoConsumeSchemaTestsConfig.class) +class PulsarListenerAutoConsumeSchemaTests extends PulsarListenerTestsBase { + + static final String STRING_TOPIC = "placst-str-topic"; + static CountDownLatch stringLatch = new CountDownLatch(3); + static List stringMessages = new ArrayList<>(); + + static final String JSON_TOPIC = "placst-json-topic"; + static CountDownLatch jsonLatch = new CountDownLatch(3); + static List> jsonMessages = new ArrayList<>(); + + static final String AVRO_TOPIC = "placst-avro-topic"; + static CountDownLatch avroLatch = new CountDownLatch(3); + static List> avroMessages = new ArrayList<>(); + + static final String KEYVALUE_TOPIC = "placst-kv-topic"; + static CountDownLatch keyValueLatch = new CountDownLatch(3); + static List> keyValueMessages = new ArrayList<>(); + + @Test + void stringSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var expectedMessages = new ArrayList(); + for (int i = 0; i < 3; i++) { + var msg = "str-" + i; + template.send(STRING_TOPIC, msg, Schema.STRING); + expectedMessages.add(msg); + } + assertThat(stringLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(stringMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void jsonSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = JSONSchema.of(UserRecord.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserRecord("Jason", i); + template.send(JSON_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.name(), "age", user.age())); + } + assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(jsonMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void avroSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = AvroSchema.of(UserPojo.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserPojo("Avi", i); + template.send(AVRO_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.getName(), "age", user.getAge())); + } + assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(avroMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void keyValueSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory>(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var kv = new KeyValue<>("Kevin", i); + template.send(KEYVALUE_TOPIC, kv, kvSchema); + expectedMessages.add(Map.of(kv.getKey(), kv.getValue())); + } + assertThat(keyValueLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(keyValueMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @EnablePulsar + @Configuration + static class PulsarListenerAutoConsumeSchemaTestsConfig { + + @PulsarListener(id = "stringAcListener", topics = STRING_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + properties = { "subscriptionInitialPosition=Earliest" }) + void listenString(Message genericMessage) { + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(String.class); + stringMessages.add(genericMessage.getValue().getNativeObject().toString()); + stringLatch.countDown(); + } + + @PulsarListener(id = "jsonAcListener", topics = JSON_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + properties = { "subscriptionInitialPosition=Earliest" }) + void listenJson(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericJsonRecord.class); + GenericJsonRecord record = (GenericJsonRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.JSON); + assertThat(record).extracting("schemaInfo") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\": \"UserRecord\"")); + jsonMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + jsonLatch.countDown(); + } + + @PulsarListener(id = "avroAcListener", topics = AVRO_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + properties = { "subscriptionInitialPosition=Earliest" }) + void listenAvro(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericAvroRecord.class); + GenericAvroRecord record = (GenericAvroRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.AVRO); + assertThat(record).extracting("schema") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\":\"UserPojo\"")); + avroMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + avroLatch.countDown(); + } + + @SuppressWarnings("unchecked") + @PulsarListener(id = "keyvalueAcListener", topics = KEYVALUE_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + properties = { "subscriptionInitialPosition=Earliest" }) + void listenKeyvalue(Message genericMessage) { + assertThat(genericMessage.getValue().getSchemaType()).isEqualTo(SchemaType.KEY_VALUE); + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(KeyValue.class); + var record = (KeyValue) genericMessage.getValue().getNativeObject(); + keyValueMessages.add(Map.of(record.getKey(), record.getValue())); + keyValueLatch.countDown(); + } + + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderAutoConsumeSchemaTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderAutoConsumeSchemaTests.java new file mode 100644 index 00000000..54690417 --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderAutoConsumeSchemaTests.java @@ -0,0 +1,191 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord; +import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; +import org.junit.jupiter.api.Test; + +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.annotation.PulsarReader; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.reader.PulsarReaderAutoConsumeSchemaTests.PulsarReaderAutoConsumeSchemaTestsConfig; +import org.springframework.pulsar.test.support.model.UserPojo; +import org.springframework.pulsar.test.support.model.UserRecord; +import org.springframework.test.context.ContextConfiguration; + +/** + * Tests for {@link PulsarReader @PulsarReader} using {@code schemaType} of + * {@link SchemaType#AUTO_CONSUME}. + * + * @author Chris Bono + */ +@ContextConfiguration(classes = PulsarReaderAutoConsumeSchemaTestsConfig.class) +class PulsarReaderAutoConsumeSchemaTests extends PulsarReaderTestsBase { + + static final String STRING_TOPIC = "pracst-str-topic"; + static CountDownLatch stringLatch = new CountDownLatch(3); + static List stringMessages = new ArrayList<>(); + + static final String JSON_TOPIC = "pracst-json-topic"; + static CountDownLatch jsonLatch = new CountDownLatch(3); + static List> jsonMessages = new ArrayList<>(); + + static final String AVRO_TOPIC = "pracst-avro-topic"; + static CountDownLatch avroLatch = new CountDownLatch(3); + static List> avroMessages = new ArrayList<>(); + + static final String KEYVALUE_TOPIC = "pracst-kv-topic"; + static CountDownLatch keyValueLatch = new CountDownLatch(3); + static List> keyValueMessages = new ArrayList<>(); + + @Test + void stringSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var expectedMessages = new ArrayList(); + for (int i = 0; i < 3; i++) { + var msg = "str-" + i; + template.send(STRING_TOPIC, msg, Schema.STRING); + expectedMessages.add(msg); + } + assertThat(stringLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(stringMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void jsonSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = JSONSchema.of(UserRecord.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserRecord("Jason", i); + template.send(JSON_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.name(), "age", user.age())); + } + assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(jsonMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void avroSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = AvroSchema.of(UserPojo.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserPojo("Avi", i); + template.send(AVRO_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.getName(), "age", user.getAge())); + } + assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(avroMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void keyValueSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory>(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var kv = new KeyValue<>("Kevin", i); + template.send(KEYVALUE_TOPIC, kv, kvSchema); + expectedMessages.add(Map.of(kv.getKey(), kv.getValue())); + } + assertThat(keyValueLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(keyValueMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @EnablePulsar + @Configuration + static class PulsarReaderAutoConsumeSchemaTestsConfig { + + @PulsarReader(id = "stringAcListener", topics = STRING_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + startMessageId = "earliest") + void listenString(Message genericMessage) { + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(String.class); + stringMessages.add(genericMessage.getValue().getNativeObject().toString()); + stringLatch.countDown(); + } + + @PulsarReader(id = "jsonAcListener", topics = JSON_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + startMessageId = "earliest") + void listenJson(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericJsonRecord.class); + GenericJsonRecord record = (GenericJsonRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.JSON); + assertThat(record).extracting("schemaInfo") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\": \"UserRecord\"")); + jsonMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + jsonLatch.countDown(); + } + + @PulsarReader(id = "avroAcListener", topics = AVRO_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + startMessageId = "earliest") + void listenAvro(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericAvroRecord.class); + GenericAvroRecord record = (GenericAvroRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.AVRO); + assertThat(record).extracting("schema") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\":\"UserPojo\"")); + avroMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + avroLatch.countDown(); + } + + @SuppressWarnings("unchecked") + @PulsarReader(id = "keyvalueAcListener", topics = KEYVALUE_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + startMessageId = "earliest") + void listenKeyvalue(Message genericMessage) { + assertThat(genericMessage.getValue().getSchemaType()).isEqualTo(SchemaType.KEY_VALUE); + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(KeyValue.class); + var record = (KeyValue) genericMessage.getValue().getNativeObject(); + keyValueMessages.add(Map.of(record.getKey(), record.getValue())); + keyValueLatch.countDown(); + } + + } + +}