Skip to content

Commit

Permalink
Add support for AUTO_CONSUME schema type
Browse files Browse the repository at this point in the history
See #380
  • Loading branch information
onobc committed Feb 2, 2024
1 parent 80f9b49 commit 92bfe7c
Show file tree
Hide file tree
Showing 10 changed files with 770 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,11 +26,11 @@
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.log.LogAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.SmartMessageConverter;
Expand Down Expand Up @@ -65,6 +65,8 @@
*/
public class MethodReactivePulsarListenerEndpoint<V> extends AbstractReactivePulsarListenerEndpoint<V> {

private final LogAccessor logger = new LogAccessor(this.getClass());

private Object bean;

private Method method;
Expand Down Expand Up @@ -134,18 +136,29 @@ protected AbstractPulsarMessageToSpringMessageAdapter<V> createMessageHandler(
messageParameter = parameter.get();
}

DefaultReactivePulsarMessageListenerContainer<?> containerInstance = (DefaultReactivePulsarMessageListenerContainer<?>) container;
ReactivePulsarContainerProperties<?> pulsarContainerProperties = containerInstance.getContainerProperties();
DefaultReactivePulsarMessageListenerContainer<Object> containerInstance = (DefaultReactivePulsarMessageListenerContainer<Object>) container;
ReactivePulsarContainerProperties<Object> pulsarContainerProperties = containerInstance
.getContainerProperties();

// Resolve the schema using the reader schema type
SchemaResolver schemaResolver = pulsarContainerProperties.getSchemaResolver();
SchemaType schemaType = pulsarContainerProperties.getSchemaType();
ResolvableType messageType = resolvableType(messageParameter);
schemaResolver.resolveSchema(schemaType, messageType)
.ifResolved(schema -> pulsarContainerProperties.setSchema((Schema) schema));

// Make sure the schemaType is updated to match the current schema
.ifResolvedOrElse(pulsarContainerProperties::setSchema,
(ex) -> this.logger
.warn(() -> "Failed to resolve schema for type %s - will default to BYTES (due to: %s)"
.formatted(schemaType, ex.getMessage())));

// Attempt to make sure the schemaType is updated to match the resolved schema.
// This can occur when the resolver returns a schema that is not necessarily of
// the same type as the input scheme type (e.g. SchemaType.NONE uses the message
// type to determine the schema.
if (pulsarContainerProperties.getSchema() != null) {
SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType();
pulsarContainerProperties.setSchemaType(type);
var schemaInfo = pulsarContainerProperties.getSchema().getSchemaInfo();
if (schemaInfo != null) {
pulsarContainerProperties.setSchemaType(schemaInfo.getType());
}
}

// If no topic info is set on endpoint attempt to resolve via message type
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.reactive.listener;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.junit.jupiter.api.Test;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerAutoConsumeSchemaTests.ReactivePulsarListenerAutoConsumeSchemaTestsConfig;
import org.springframework.pulsar.test.support.model.UserPojo;
import org.springframework.pulsar.test.support.model.UserRecord;
import org.springframework.test.context.ContextConfiguration;

import reactor.core.publisher.Mono;

/**
* Tests for {@link ReactivePulsarListener @ReactivePulsarListener} using
* {@code schemaType} of {@link SchemaType#AUTO_CONSUME}.
*
* @author Chris Bono
*/
@ContextConfiguration(classes = ReactivePulsarListenerAutoConsumeSchemaTestsConfig.class)
class ReactivePulsarListenerAutoConsumeSchemaTests extends ReactivePulsarListenerTestsBase {

static final String STRING_TOPIC = "placst-str-topic";
static CountDownLatch stringLatch = new CountDownLatch(3);
static List<String> stringMessages = new ArrayList<>();

static final String JSON_TOPIC = "placst-json-topic";
static CountDownLatch jsonLatch = new CountDownLatch(3);
static List<Map<String, Object>> jsonMessages = new ArrayList<>();

static final String AVRO_TOPIC = "placst-avro-topic";
static CountDownLatch avroLatch = new CountDownLatch(3);
static List<Map<String, Object>> avroMessages = new ArrayList<>();

static final String KEYVALUE_TOPIC = "placst-kv-topic";
static CountDownLatch keyValueLatch = new CountDownLatch(3);
static List<Map<String, Object>> keyValueMessages = new ArrayList<>();

@Test
void stringSchema() throws Exception {
var pulsarProducerFactory = new DefaultPulsarProducerFactory<String>(pulsarClient);
var template = new PulsarTemplate<>(pulsarProducerFactory);
var expectedMessages = new ArrayList<String>();
for (int i = 0; i < 3; i++) {
var msg = "str-" + i;
template.send(STRING_TOPIC, msg, Schema.STRING);
expectedMessages.add(msg);
}
assertThat(stringLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(stringMessages).containsExactlyInAnyOrderElementsOf(expectedMessages);
}

@Test
void jsonSchema() throws Exception {
var pulsarProducerFactory = new DefaultPulsarProducerFactory<UserRecord>(pulsarClient);
var template = new PulsarTemplate<>(pulsarProducerFactory);
var schema = JSONSchema.of(UserRecord.class);
var expectedMessages = new ArrayList<Map<String, Object>>();
for (int i = 0; i < 3; i++) {
var user = new UserRecord("Jason", i);
template.send(JSON_TOPIC, user, schema);
expectedMessages.add(Map.of("name", user.name(), "age", user.age()));
}
assertThat(jsonLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(jsonMessages).containsExactlyInAnyOrderElementsOf(expectedMessages);
}

@Test
void avroSchema() throws Exception {
var pulsarProducerFactory = new DefaultPulsarProducerFactory<UserPojo>(pulsarClient);
var template = new PulsarTemplate<>(pulsarProducerFactory);
var schema = AvroSchema.of(UserPojo.class);
var expectedMessages = new ArrayList<Map<String, Object>>();
for (int i = 0; i < 3; i++) {
var user = new UserPojo("Avi", i);
template.send(AVRO_TOPIC, user, schema);
expectedMessages.add(Map.of("name", user.getName(), "age", user.getAge()));
}
assertThat(avroLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(avroMessages).containsExactlyInAnyOrderElementsOf(expectedMessages);
}

@Test
void keyValueSchema() throws Exception {
var pulsarProducerFactory = new DefaultPulsarProducerFactory<KeyValue<String, Integer>>(pulsarClient);
var template = new PulsarTemplate<>(pulsarProducerFactory);
var kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE);
var expectedMessages = new ArrayList<Map<String, Object>>();
for (int i = 0; i < 3; i++) {
var kv = new KeyValue<>("Kevin", i);
template.send(KEYVALUE_TOPIC, kv, kvSchema);
expectedMessages.add(Map.of(kv.getKey(), kv.getValue()));
}
assertThat(keyValueLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(keyValueMessages).containsExactlyInAnyOrderElementsOf(expectedMessages);
}

@EnablePulsar
@Configuration
static class ReactivePulsarListenerAutoConsumeSchemaTestsConfig {

@ReactivePulsarListener(id = "stringAcListener", topics = STRING_TOPIC, schemaType = SchemaType.AUTO_CONSUME,
consumerCustomizer = "earliestCustomizer")
Mono<Void> listenString(Message<GenericRecord> genericMessage) {
assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(String.class);
stringMessages.add(genericMessage.getValue().getNativeObject().toString());
stringLatch.countDown();
return Mono.empty();
}

@ReactivePulsarListener(id = "jsonAcListener", topics = JSON_TOPIC, schemaType = SchemaType.AUTO_CONSUME,
consumerCustomizer = "earliestCustomizer")
Mono<Void> listenJson(Message<GenericRecord> genericMessage) {
assertThat(genericMessage.getValue()).isInstanceOf(GenericJsonRecord.class);
GenericJsonRecord record = (GenericJsonRecord) genericMessage.getValue();
assertThat(record.getSchemaType()).isEqualTo(SchemaType.JSON);
assertThat(record).extracting("schemaInfo")
.satisfies((obj) -> assertThat(obj.toString()).contains("\"name\": \"UserRecord\""));
jsonMessages.add(record.getFields()
.stream()
.map(Field::getName)
.collect(Collectors.toMap(Function.identity(), record::getField)));
jsonLatch.countDown();
return Mono.empty();
}

@ReactivePulsarListener(id = "avroAcListener", topics = AVRO_TOPIC, schemaType = SchemaType.AUTO_CONSUME,
consumerCustomizer = "earliestCustomizer")
Mono<Void> listenAvro(Message<GenericRecord> genericMessage) {
assertThat(genericMessage.getValue()).isInstanceOf(GenericAvroRecord.class);
GenericAvroRecord record = (GenericAvroRecord) genericMessage.getValue();
assertThat(record.getSchemaType()).isEqualTo(SchemaType.AVRO);
assertThat(record).extracting("schema")
.satisfies((obj) -> assertThat(obj.toString()).contains("\"name\":\"UserPojo\""));
avroMessages.add(record.getFields()
.stream()
.map(Field::getName)
.collect(Collectors.toMap(Function.identity(), record::getField)));
avroLatch.countDown();
return Mono.empty();
}

@SuppressWarnings("unchecked")
@ReactivePulsarListener(id = "keyvalueAcListener", topics = KEYVALUE_TOPIC,
schemaType = SchemaType.AUTO_CONSUME, consumerCustomizer = "earliestCustomizer")
Mono<Void> listenKeyvalue(Message<GenericRecord> genericMessage) {
assertThat(genericMessage.getValue().getSchemaType()).isEqualTo(SchemaType.KEY_VALUE);
assertThat(genericMessage.getValue().getNativeObject()).isInstanceOf(KeyValue.class);
var record = (KeyValue<String, Object>) genericMessage.getValue().getNativeObject();
keyValueMessages.add(Map.of(record.getKey(), record.getValue()));
keyValueLatch.countDown();
return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<?> earliestCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.test.support.model;

import java.util.Objects;

/**
* Test object (user) defined via standard Java beans get/set methods.
* <p>
* <b>WARN</b> Do not convert this to a Record as this is used for Avro tests and Avro
* does not work well w/ records yet.
*/
public class UserPojo {

private String name;

private int age;

UserPojo() {
}

public UserPojo(String name, int age) {
this.name = name;
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UserPojo user = (UserPojo) o;
return age == user.age && Objects.equals(name, user.name);
}

@Override
public int hashCode() {
return Objects.hash(name, age);
}

@Override
public String toString() {
return "User{" + "name='" + name + '\'' + ", age=" + age + '}';
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.test.support.model;

/**
* Test object (user) defined via a Java record.
*
* @param name the user's name
* @param age the user's age
*/
public record UserRecord(String name, int age) {
}
Loading

0 comments on commit 92bfe7c

Please sign in to comment.