From 92bfe7cb5550cc3a96b7a99208d2e3cb57feccfc Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Mon, 29 Jan 2024 23:11:19 -0600 Subject: [PATCH] Add support for AUTO_CONSUME schema type See #380 --- .../MethodReactivePulsarListenerEndpoint.java | 31 ++- ...ePulsarListenerAutoConsumeSchemaTests.java | 205 ++++++++++++++++++ .../pulsar/test/support/model/UserPojo.java | 79 +++++++ .../pulsar/test/support/model/UserRecord.java | 26 +++ .../config/MethodPulsarListenerEndpoint.java | 26 ++- .../config/MethodPulsarReaderEndpoint.java | 28 ++- .../pulsar/core/DefaultSchemaResolver.java | 3 +- .../core/DefaultSchemaResolverTests.java | 16 +- .../PulsarListenerAutoConsumeSchemaTests.java | 191 ++++++++++++++++ .../PulsarReaderAutoConsumeSchemaTests.java | 191 ++++++++++++++++ 10 files changed, 770 insertions(+), 26 deletions(-) create mode 100644 spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerAutoConsumeSchemaTests.java create mode 100644 spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserPojo.java create mode 100644 spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserRecord.java create mode 100644 spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerAutoConsumeSchemaTests.java create mode 100644 spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderAutoConsumeSchemaTests.java diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java index d39386ff..585a6f40 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,11 +26,11 @@ import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaType; import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; +import org.springframework.core.log.LogAccessor; import org.springframework.expression.BeanResolver; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.SmartMessageConverter; @@ -65,6 +65,8 @@ */ public class MethodReactivePulsarListenerEndpoint extends AbstractReactivePulsarListenerEndpoint { + private final LogAccessor logger = new LogAccessor(this.getClass()); + private Object bean; private Method method; @@ -134,18 +136,29 @@ protected AbstractPulsarMessageToSpringMessageAdapter createMessageHandler( messageParameter = parameter.get(); } - DefaultReactivePulsarMessageListenerContainer containerInstance = (DefaultReactivePulsarMessageListenerContainer) container; - ReactivePulsarContainerProperties pulsarContainerProperties = containerInstance.getContainerProperties(); + DefaultReactivePulsarMessageListenerContainer containerInstance = (DefaultReactivePulsarMessageListenerContainer) container; + ReactivePulsarContainerProperties pulsarContainerProperties = containerInstance + .getContainerProperties(); + + // Resolve the schema using the reader schema type SchemaResolver schemaResolver = pulsarContainerProperties.getSchemaResolver(); SchemaType schemaType = pulsarContainerProperties.getSchemaType(); ResolvableType messageType = resolvableType(messageParameter); schemaResolver.resolveSchema(schemaType, messageType) - .ifResolved(schema -> pulsarContainerProperties.setSchema((Schema) schema)); - - // Make sure the schemaType is updated to match the current schema + .ifResolvedOrElse(pulsarContainerProperties::setSchema, + (ex) -> this.logger + .warn(() -> "Failed to resolve schema for type %s - will default to BYTES (due to: %s)" + .formatted(schemaType, ex.getMessage()))); + + // Attempt to make sure the schemaType is updated to match the resolved schema. + // This can occur when the resolver returns a schema that is not necessarily of + // the same type as the input scheme type (e.g. SchemaType.NONE uses the message + // type to determine the schema. if (pulsarContainerProperties.getSchema() != null) { - SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType(); - pulsarContainerProperties.setSchemaType(type); + var schemaInfo = pulsarContainerProperties.getSchema().getSchemaInfo(); + if (schemaInfo != null) { + pulsarContainerProperties.setSchemaType(schemaInfo.getType()); + } } // If no topic info is set on endpoint attempt to resolve via message type diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerAutoConsumeSchemaTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerAutoConsumeSchemaTests.java new file mode 100644 index 00000000..3d0f4bb1 --- /dev/null +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerAutoConsumeSchemaTests.java @@ -0,0 +1,205 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reactive.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord; +import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; +import org.junit.jupiter.api.Test; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; +import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerAutoConsumeSchemaTests.ReactivePulsarListenerAutoConsumeSchemaTestsConfig; +import org.springframework.pulsar.test.support.model.UserPojo; +import org.springframework.pulsar.test.support.model.UserRecord; +import org.springframework.test.context.ContextConfiguration; + +import reactor.core.publisher.Mono; + +/** + * Tests for {@link ReactivePulsarListener @ReactivePulsarListener} using + * {@code schemaType} of {@link SchemaType#AUTO_CONSUME}. + * + * @author Chris Bono + */ +@ContextConfiguration(classes = ReactivePulsarListenerAutoConsumeSchemaTestsConfig.class) +class ReactivePulsarListenerAutoConsumeSchemaTests extends ReactivePulsarListenerTestsBase { + + static final String STRING_TOPIC = "placst-str-topic"; + static CountDownLatch stringLatch = new CountDownLatch(3); + static List stringMessages = new ArrayList<>(); + + static final String JSON_TOPIC = "placst-json-topic"; + static CountDownLatch jsonLatch = new CountDownLatch(3); + static List> jsonMessages = new ArrayList<>(); + + static final String AVRO_TOPIC = "placst-avro-topic"; + static CountDownLatch avroLatch = new CountDownLatch(3); + static List> avroMessages = new ArrayList<>(); + + static final String KEYVALUE_TOPIC = "placst-kv-topic"; + static CountDownLatch keyValueLatch = new CountDownLatch(3); + static List> keyValueMessages = new ArrayList<>(); + + @Test + void stringSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var expectedMessages = new ArrayList(); + for (int i = 0; i < 3; i++) { + var msg = "str-" + i; + template.send(STRING_TOPIC, msg, Schema.STRING); + expectedMessages.add(msg); + } + assertThat(stringLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(stringMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void jsonSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = JSONSchema.of(UserRecord.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserRecord("Jason", i); + template.send(JSON_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.name(), "age", user.age())); + } + assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(jsonMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void avroSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = AvroSchema.of(UserPojo.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserPojo("Avi", i); + template.send(AVRO_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.getName(), "age", user.getAge())); + } + assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(avroMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void keyValueSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory>(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var kv = new KeyValue<>("Kevin", i); + template.send(KEYVALUE_TOPIC, kv, kvSchema); + expectedMessages.add(Map.of(kv.getKey(), kv.getValue())); + } + assertThat(keyValueLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(keyValueMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @EnablePulsar + @Configuration + static class ReactivePulsarListenerAutoConsumeSchemaTestsConfig { + + @ReactivePulsarListener(id = "stringAcListener", topics = STRING_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + consumerCustomizer = "earliestCustomizer") + Mono listenString(Message genericMessage) { + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(String.class); + stringMessages.add(genericMessage.getValue().getNativeObject().toString()); + stringLatch.countDown(); + return Mono.empty(); + } + + @ReactivePulsarListener(id = "jsonAcListener", topics = JSON_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + consumerCustomizer = "earliestCustomizer") + Mono listenJson(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericJsonRecord.class); + GenericJsonRecord record = (GenericJsonRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.JSON); + assertThat(record).extracting("schemaInfo") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\": \"UserRecord\"")); + jsonMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + jsonLatch.countDown(); + return Mono.empty(); + } + + @ReactivePulsarListener(id = "avroAcListener", topics = AVRO_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + consumerCustomizer = "earliestCustomizer") + Mono listenAvro(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericAvroRecord.class); + GenericAvroRecord record = (GenericAvroRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.AVRO); + assertThat(record).extracting("schema") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\":\"UserPojo\"")); + avroMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + avroLatch.countDown(); + return Mono.empty(); + } + + @SuppressWarnings("unchecked") + @ReactivePulsarListener(id = "keyvalueAcListener", topics = KEYVALUE_TOPIC, + schemaType = SchemaType.AUTO_CONSUME, consumerCustomizer = "earliestCustomizer") + Mono listenKeyvalue(Message genericMessage) { + assertThat(genericMessage.getValue().getSchemaType()).isEqualTo(SchemaType.KEY_VALUE); + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(KeyValue.class); + var record = (KeyValue) genericMessage.getValue().getNativeObject(); + keyValueMessages.add(Map.of(record.getKey(), record.getValue())); + keyValueLatch.countDown(); + return Mono.empty(); + } + + @Bean + ReactivePulsarListenerMessageConsumerBuilderCustomizer earliestCustomizer() { + return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest); + } + + } + +} diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserPojo.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserPojo.java new file mode 100644 index 00000000..f3f2d939 --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserPojo.java @@ -0,0 +1,79 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support.model; + +import java.util.Objects; + +/** + * Test object (user) defined via standard Java beans get/set methods. + *

+ * WARN Do not convert this to a Record as this is used for Avro tests and Avro + * does not work well w/ records yet. + */ +public class UserPojo { + + private String name; + + private int age; + + UserPojo() { + } + + public UserPojo(String name, int age) { + this.name = name; + this.age = age; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UserPojo user = (UserPojo) o; + return age == user.age && Objects.equals(name, user.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, age); + } + + @Override + public String toString() { + return "User{" + "name='" + name + '\'' + ", age=" + age + '}'; + } + +} diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserRecord.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserRecord.java new file mode 100644 index 00000000..2cf3c0bc --- /dev/null +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/model/UserRecord.java @@ -0,0 +1,26 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.test.support.model; + +/** + * Test object (user) defined via a Java record. + * + * @param name the user's name + * @param age the user's age + */ +public record UserRecord(String name, int age) { +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java index 885c0835..03fa4cd8 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; +import org.springframework.core.log.LogAccessor; import org.springframework.expression.BeanResolver; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.SmartMessageConverter; @@ -66,6 +67,8 @@ */ public class MethodPulsarListenerEndpoint extends AbstractPulsarListenerEndpoint { + private final LogAccessor logger = new LogAccessor(this.getClass()); + private Object bean; private Method method; @@ -140,15 +143,26 @@ protected AbstractPulsarMessageToSpringMessageAdapter createMessageListener( ConcurrentPulsarMessageListenerContainer containerInstance = (ConcurrentPulsarMessageListenerContainer) container; PulsarContainerProperties pulsarContainerProperties = containerInstance.getContainerProperties(); + + // Resolve the schema using the listener schema type SchemaResolver schemaResolver = pulsarContainerProperties.getSchemaResolver(); SchemaType schemaType = pulsarContainerProperties.getSchemaType(); ResolvableType messageType = resolvableType(messageParameter); - schemaResolver.resolveSchema(schemaType, messageType).ifResolved(pulsarContainerProperties::setSchema); - - // Make sure the schemaType is updated to match the current schema + schemaResolver.resolveSchema(schemaType, messageType) + .ifResolvedOrElse(pulsarContainerProperties::setSchema, + (ex) -> this.logger + .warn(() -> "Failed to resolve schema for type %s - will default to BYTES (due to: %s)" + .formatted(schemaType, ex.getMessage()))); + + // Attempt to make sure the schemaType is updated to match the resolved schema. + // This can occur when the resolver returns a schema that is not necessarily of + // the same type as the input scheme type (e.g. SchemaType.NONE uses the message + // type to determine the schema. if (pulsarContainerProperties.getSchema() != null) { - SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType(); - pulsarContainerProperties.setSchemaType(type); + var schemaInfo = pulsarContainerProperties.getSchema().getSchemaInfo(); + if (schemaInfo != null) { + pulsarContainerProperties.setSchemaType(schemaInfo.getType()); + } } // If no topic info is set on endpoint attempt to resolve via message type diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java index 46675af7..b38db417 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; +import org.springframework.core.log.LogAccessor; import org.springframework.expression.BeanResolver; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.SmartMessageConverter; @@ -56,6 +57,8 @@ */ public class MethodPulsarReaderEndpoint extends AbstractPulsarReaderEndpoint { + private final LogAccessor logger = new LogAccessor(this.getClass()); + private Object bean; private Method method; @@ -114,21 +117,30 @@ protected AbstractPulsarMessageToSpringMessageAdapter createReaderListener( DefaultPulsarMessageReaderContainer containerInstance = (DefaultPulsarMessageReaderContainer) container; PulsarReaderContainerProperties pulsarContainerProperties = containerInstance.getContainerProperties(); + + // Resolve the schema using the reader schema type SchemaResolver schemaResolver = pulsarContainerProperties.getSchemaResolver(); SchemaType schemaType = pulsarContainerProperties.getSchemaType(); ResolvableType messageType = resolvableType(messageParameter); - schemaResolver.resolveSchema(schemaType, messageType).ifResolved(pulsarContainerProperties::setSchema); - - // Make sure the schemaType is updated to match the current schema + schemaResolver.resolveSchema(schemaType, messageType) + .ifResolvedOrElse(pulsarContainerProperties::setSchema, + (ex) -> this.logger + .warn(() -> "Failed to resolve schema for type %s - will default to BYTES (due to: %s)" + .formatted(schemaType, ex.getMessage()))); + + // Attempt to make sure the schemaType is updated to match the resolved schema. + // This can occur when the resolver returns a schema that is not necessarily of + // the same type as the input scheme type (e.g. SchemaType.NONE uses the message + // type to determine the schema. if (pulsarContainerProperties.getSchema() != null) { - SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType(); - pulsarContainerProperties.setSchemaType(type); + var schemaInfo = pulsarContainerProperties.getSchema().getSchemaInfo(); + if (schemaInfo != null) { + pulsarContainerProperties.setSchemaType(schemaInfo.getType()); + } } // TODO: If no topic info is set on endpoint attempt to resolve via message type - container.setReaderCustomizer(this.readerBuilderCustomizer); - return readerListener; } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java index bdc730cb..802ffbc1 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -185,6 +185,7 @@ public Resolved> resolveSchema(SchemaType schemaType, @Nullable Re requireNonNullMessageType(schemaType, messageType); yield getMessageKeyValueSchema(messageType); } + case AUTO_CONSUME -> Schema.AUTO_CONSUME(); case NONE -> { if (messageType == null || messageType.getRawClass() == null) { yield Schema.BYTES; diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java index 24307d17..09e6799d 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.KeyValueSchema; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.ProtobufSchema; @@ -255,6 +256,17 @@ void structSchemas() { })); } + @Test + void autoConsumeSchema() { + assertThat(resolver.resolveSchema(SchemaType.AUTO_CONSUME, ResolvableType.forType(Foo.class)).orElseThrow()) + .isInstanceOf(AutoConsumeSchema.class); + assertThat( + resolver.resolveSchema(SchemaType.AUTO_CONSUME, ResolvableType.forType(String.class)).orElseThrow()) + .isInstanceOf(AutoConsumeSchema.class); + assertThat(resolver.resolveSchema(SchemaType.AUTO_CONSUME, null).orElseThrow()) + .isInstanceOf(AutoConsumeSchema.class); + } + @ParameterizedTest @EnumSource(value = SchemaType.class, names = { "JSON", "AVRO", "PROTOBUF", "KEY_VALUE" }) void structSchemasRequireMessageType(SchemaType schemaType) { @@ -264,7 +276,7 @@ void structSchemasRequireMessageType(SchemaType schemaType) { } @ParameterizedTest - @EnumSource(value = SchemaType.class, names = { "PROTOBUF_NATIVE", "AUTO", "AUTO_CONSUME", "AUTO_PUBLISH" }) + @EnumSource(value = SchemaType.class, names = { "PROTOBUF_NATIVE", "AUTO", "AUTO_PUBLISH" }) void unsupportedSchemaTypes(SchemaType unsupportedType) { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> resolver.resolveSchema(unsupportedType, null).orElseThrow()) diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerAutoConsumeSchemaTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerAutoConsumeSchemaTests.java new file mode 100644 index 00000000..3dba7161 --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerAutoConsumeSchemaTests.java @@ -0,0 +1,191 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord; +import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; +import org.junit.jupiter.api.Test; + +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.listener.PulsarListenerAutoConsumeSchemaTests.PulsarListenerAutoConsumeSchemaTestsConfig; +import org.springframework.pulsar.test.support.model.UserPojo; +import org.springframework.pulsar.test.support.model.UserRecord; +import org.springframework.test.context.ContextConfiguration; + +/** + * Tests for {@link PulsarListener @PulsarListener} using {@code schemaType} of + * {@link SchemaType#AUTO_CONSUME}. + * + * @author Chris Bono + */ +@ContextConfiguration(classes = PulsarListenerAutoConsumeSchemaTestsConfig.class) +class PulsarListenerAutoConsumeSchemaTests extends PulsarListenerTestsBase { + + static final String STRING_TOPIC = "placst-str-topic"; + static CountDownLatch stringLatch = new CountDownLatch(3); + static List stringMessages = new ArrayList<>(); + + static final String JSON_TOPIC = "placst-json-topic"; + static CountDownLatch jsonLatch = new CountDownLatch(3); + static List> jsonMessages = new ArrayList<>(); + + static final String AVRO_TOPIC = "placst-avro-topic"; + static CountDownLatch avroLatch = new CountDownLatch(3); + static List> avroMessages = new ArrayList<>(); + + static final String KEYVALUE_TOPIC = "placst-kv-topic"; + static CountDownLatch keyValueLatch = new CountDownLatch(3); + static List> keyValueMessages = new ArrayList<>(); + + @Test + void stringSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var expectedMessages = new ArrayList(); + for (int i = 0; i < 3; i++) { + var msg = "str-" + i; + template.send(STRING_TOPIC, msg, Schema.STRING); + expectedMessages.add(msg); + } + assertThat(stringLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(stringMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void jsonSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = JSONSchema.of(UserRecord.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserRecord("Jason", i); + template.send(JSON_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.name(), "age", user.age())); + } + assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(jsonMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void avroSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = AvroSchema.of(UserPojo.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserPojo("Avi", i); + template.send(AVRO_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.getName(), "age", user.getAge())); + } + assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(avroMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void keyValueSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory>(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var kv = new KeyValue<>("Kevin", i); + template.send(KEYVALUE_TOPIC, kv, kvSchema); + expectedMessages.add(Map.of(kv.getKey(), kv.getValue())); + } + assertThat(keyValueLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(keyValueMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @EnablePulsar + @Configuration + static class PulsarListenerAutoConsumeSchemaTestsConfig { + + @PulsarListener(id = "stringAcListener", topics = STRING_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + properties = { "subscriptionInitialPosition=Earliest" }) + void listenString(Message genericMessage) { + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(String.class); + stringMessages.add(genericMessage.getValue().getNativeObject().toString()); + stringLatch.countDown(); + } + + @PulsarListener(id = "jsonAcListener", topics = JSON_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + properties = { "subscriptionInitialPosition=Earliest" }) + void listenJson(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericJsonRecord.class); + GenericJsonRecord record = (GenericJsonRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.JSON); + assertThat(record).extracting("schemaInfo") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\": \"UserRecord\"")); + jsonMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + jsonLatch.countDown(); + } + + @PulsarListener(id = "avroAcListener", topics = AVRO_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + properties = { "subscriptionInitialPosition=Earliest" }) + void listenAvro(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericAvroRecord.class); + GenericAvroRecord record = (GenericAvroRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.AVRO); + assertThat(record).extracting("schema") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\":\"UserPojo\"")); + avroMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + avroLatch.countDown(); + } + + @SuppressWarnings("unchecked") + @PulsarListener(id = "keyvalueAcListener", topics = KEYVALUE_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + properties = { "subscriptionInitialPosition=Earliest" }) + void listenKeyvalue(Message genericMessage) { + assertThat(genericMessage.getValue().getSchemaType()).isEqualTo(SchemaType.KEY_VALUE); + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(KeyValue.class); + var record = (KeyValue) genericMessage.getValue().getNativeObject(); + keyValueMessages.add(Map.of(record.getKey(), record.getValue())); + keyValueLatch.countDown(); + } + + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderAutoConsumeSchemaTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderAutoConsumeSchemaTests.java new file mode 100644 index 00000000..54690417 --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderAutoConsumeSchemaTests.java @@ -0,0 +1,191 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord; +import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; +import org.junit.jupiter.api.Test; + +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.annotation.PulsarReader; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.reader.PulsarReaderAutoConsumeSchemaTests.PulsarReaderAutoConsumeSchemaTestsConfig; +import org.springframework.pulsar.test.support.model.UserPojo; +import org.springframework.pulsar.test.support.model.UserRecord; +import org.springframework.test.context.ContextConfiguration; + +/** + * Tests for {@link PulsarReader @PulsarReader} using {@code schemaType} of + * {@link SchemaType#AUTO_CONSUME}. + * + * @author Chris Bono + */ +@ContextConfiguration(classes = PulsarReaderAutoConsumeSchemaTestsConfig.class) +class PulsarReaderAutoConsumeSchemaTests extends PulsarReaderTestsBase { + + static final String STRING_TOPIC = "pracst-str-topic"; + static CountDownLatch stringLatch = new CountDownLatch(3); + static List stringMessages = new ArrayList<>(); + + static final String JSON_TOPIC = "pracst-json-topic"; + static CountDownLatch jsonLatch = new CountDownLatch(3); + static List> jsonMessages = new ArrayList<>(); + + static final String AVRO_TOPIC = "pracst-avro-topic"; + static CountDownLatch avroLatch = new CountDownLatch(3); + static List> avroMessages = new ArrayList<>(); + + static final String KEYVALUE_TOPIC = "pracst-kv-topic"; + static CountDownLatch keyValueLatch = new CountDownLatch(3); + static List> keyValueMessages = new ArrayList<>(); + + @Test + void stringSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var expectedMessages = new ArrayList(); + for (int i = 0; i < 3; i++) { + var msg = "str-" + i; + template.send(STRING_TOPIC, msg, Schema.STRING); + expectedMessages.add(msg); + } + assertThat(stringLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(stringMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void jsonSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = JSONSchema.of(UserRecord.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserRecord("Jason", i); + template.send(JSON_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.name(), "age", user.age())); + } + assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(jsonMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void avroSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var schema = AvroSchema.of(UserPojo.class); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var user = new UserPojo("Avi", i); + template.send(AVRO_TOPIC, user, schema); + expectedMessages.add(Map.of("name", user.getName(), "age", user.getAge())); + } + assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(avroMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @Test + void keyValueSchema() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory>(pulsarClient); + var template = new PulsarTemplate<>(pulsarProducerFactory); + var kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE); + var expectedMessages = new ArrayList>(); + for (int i = 0; i < 3; i++) { + var kv = new KeyValue<>("Kevin", i); + template.send(KEYVALUE_TOPIC, kv, kvSchema); + expectedMessages.add(Map.of(kv.getKey(), kv.getValue())); + } + assertThat(keyValueLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(keyValueMessages).containsExactlyInAnyOrderElementsOf(expectedMessages); + } + + @EnablePulsar + @Configuration + static class PulsarReaderAutoConsumeSchemaTestsConfig { + + @PulsarReader(id = "stringAcListener", topics = STRING_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + startMessageId = "earliest") + void listenString(Message genericMessage) { + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(String.class); + stringMessages.add(genericMessage.getValue().getNativeObject().toString()); + stringLatch.countDown(); + } + + @PulsarReader(id = "jsonAcListener", topics = JSON_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + startMessageId = "earliest") + void listenJson(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericJsonRecord.class); + GenericJsonRecord record = (GenericJsonRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.JSON); + assertThat(record).extracting("schemaInfo") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\": \"UserRecord\"")); + jsonMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + jsonLatch.countDown(); + } + + @PulsarReader(id = "avroAcListener", topics = AVRO_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + startMessageId = "earliest") + void listenAvro(Message genericMessage) { + assertThat(genericMessage.getValue()).isInstanceOf(GenericAvroRecord.class); + GenericAvroRecord record = (GenericAvroRecord) genericMessage.getValue(); + assertThat(record.getSchemaType()).isEqualTo(SchemaType.AVRO); + assertThat(record).extracting("schema") + .satisfies((obj) -> assertThat(obj.toString()).contains("\"name\":\"UserPojo\"")); + avroMessages.add(record.getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toMap(Function.identity(), record::getField))); + avroLatch.countDown(); + } + + @SuppressWarnings("unchecked") + @PulsarReader(id = "keyvalueAcListener", topics = KEYVALUE_TOPIC, schemaType = SchemaType.AUTO_CONSUME, + startMessageId = "earliest") + void listenKeyvalue(Message genericMessage) { + assertThat(genericMessage.getValue().getSchemaType()).isEqualTo(SchemaType.KEY_VALUE); + assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(KeyValue.class); + var record = (KeyValue) genericMessage.getValue().getNativeObject(); + keyValueMessages.add(Map.of(record.getKey(), record.getValue())); + keyValueLatch.countDown(); + } + + } + +}