Skip to content

Commit

Permalink
Favor unchecked exceptions in APIs
Browse files Browse the repository at this point in the history
This commit replaces usages of the checked PulsarClientException with
the newly introduced unchecked PulsarException.

Resolves #547
  • Loading branch information
jonas-grgt authored and onobc committed Jan 26, 2024
1 parent 6182d15 commit 1126c46
Show file tree
Hide file tree
Showing 15 changed files with 149 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Spring Pulsar specific {@link NestedRuntimeException} implementation.
*
* @author Soby Chacko
* @author Jonas Geiregat
*/
public class PulsarException extends NestedRuntimeException {

Expand All @@ -33,4 +34,8 @@ public PulsarException(String msg, Throwable cause) {
super(msg, cause);
}

public PulsarException(Throwable cause) {
this(cause.getMessage(), cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,10 @@ protected Producer<T> doCreateProducer(Schema<T> schema, @Nullable String topic,

private Producer<T> createCacheableProducer(Schema<T> schema, String topic,
@Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers) {
try {
var producer = super.doCreateProducer(schema, topic, encryptionKeys, customizers);
return new ProducerWithCloseCallback<>(producer,
(p) -> this.logger.trace(() -> "Client closed producer %s but will skip actual closing"
.formatted(ProducerUtils.formatProducer(producer))));
}
catch (PulsarClientException ex) {
throw new RuntimeException(ex);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.PulsarException;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -57,14 +58,19 @@ public DefaultPulsarClientFactory(PulsarClientBuilderCustomizer customizer) {
}

@Override
public PulsarClient createClient() throws PulsarClientException {
public PulsarClient createClient() {
if (this.useRestartableClient) {
this.logger.info(() -> "Using restartable client");
return new PulsarClientProxy(this.customizer);
}
var clientBuilder = PulsarClient.builder();
this.customizer.customize(clientBuilder);
return clientBuilder.build();
try {
return clientBuilder.build();
}
catch (PulsarClientException ex) {
throw new PulsarException(ex);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;

import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;
import org.springframework.util.CollectionUtils;

/**
Expand All @@ -42,6 +43,7 @@
* @author Alexander Preuß
* @author Christophe Bornet
* @author Chris Bono
* @author Jonas Geiregat
*/
public class DefaultPulsarConsumerFactory<T> implements PulsarConsumerFactory<T> {

Expand All @@ -64,15 +66,23 @@ public DefaultPulsarConsumerFactory(PulsarClient pulsarClient,

@Override
public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics,
@Nullable String subscriptionName, ConsumerBuilderCustomizer<T> customizer) throws PulsarClientException {
return createConsumer(schema, topics, subscriptionName, null,
customizer != null ? Collections.singletonList(customizer) : null);
@Nullable String subscriptionName, ConsumerBuilderCustomizer<T> customizer) {
try {
return createConsumer(schema, topics, subscriptionName, null,
customizer != null ? Collections.singletonList(customizer) : null);
}
catch (PulsarException ex) {
throw ex;
}
catch (Exception ex) {
throw new PulsarException(PulsarClientException.unwrap(ex));
}
}

@Override
public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics,
@Nullable String subscriptionName, @Nullable Map<String, String> metadataProperties,
@Nullable List<ConsumerBuilderCustomizer<T>> customizers) throws PulsarClientException {
@Nullable List<ConsumerBuilderCustomizer<T>> customizers) {
Objects.requireNonNull(schema, "Schema must be specified");
ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(schema);

Expand All @@ -92,7 +102,12 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
}
return consumerBuilder.subscribe();
try {
return consumerBuilder.subscribe();
}
catch (PulsarClientException ex) {
throw new PulsarException(ex);
}
}

private void replaceTopicsOnBuilder(ConsumerBuilder<T> builder, Collection<String> topics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;
import org.springframework.util.CollectionUtils;

/**
Expand Down Expand Up @@ -102,21 +103,37 @@ public DefaultPulsarProducerFactory(PulsarClient pulsarClient, @Nullable String
}

@Override
public Producer<T> createProducer(Schema<T> schema, @Nullable String topic) throws PulsarClientException {
public Producer<T> createProducer(Schema<T> schema, @Nullable String topic) {
return doCreateProducer(schema, topic, null, null);
}

@Override
public Producer<T> createProducer(Schema<T> schema, @Nullable String topic,
@Nullable ProducerBuilderCustomizer<T> customizer) throws PulsarClientException {
return doCreateProducer(schema, topic, null, customizer != null ? Collections.singletonList(customizer) : null);
@Nullable ProducerBuilderCustomizer<T> customizer) {
try {
return doCreateProducer(schema, topic, null,
customizer != null ? Collections.singletonList(customizer) : null);
}
catch (PulsarException ex) {
throw ex;
}
catch (Exception ex) {
throw new PulsarException(PulsarClientException.unwrap(ex));
}
}

@Override
public Producer<T> createProducer(Schema<T> schema, @Nullable String topic,
@Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers)
throws PulsarClientException {
return doCreateProducer(schema, topic, encryptionKeys, customizers);
@Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers) {
try {
return doCreateProducer(schema, topic, encryptionKeys, customizers);
}
catch (PulsarException ex) {
throw ex;
}
catch (Exception ex) {
throw new PulsarException(PulsarClientException.unwrap(ex));
}
}

/**
Expand All @@ -134,8 +151,7 @@ public Producer<T> createProducer(Schema<T> schema, @Nullable String topic,
* @throws PulsarClientException if any error occurs
*/
protected Producer<T> doCreateProducer(Schema<T> schema, @Nullable String topic,
@Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers)
throws PulsarClientException {
@Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers) {
Objects.requireNonNull(schema, "Schema must be specified");
var resolvedTopic = resolveTopicName(topic);
this.logger.trace(() -> "Creating producer for '%s' topic".formatted(resolvedTopic));
Expand All @@ -156,7 +172,12 @@ protected Producer<T> doCreateProducer(Schema<T> schema, @Nullable String topic,
}
producerBuilder.topic(resolvedTopic);

return producerBuilder.create();
try {
return producerBuilder.create();
}
catch (PulsarClientException ex) {
throw new PulsarException(ex);
}
}

protected String resolveTopicName(String userSpecifiedTopic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.springframework.pulsar.core;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

import org.springframework.pulsar.PulsarException;

/**
* Pulsar client factory interface.
Expand All @@ -30,8 +31,8 @@ public interface PulsarClientFactory {
/**
* Create a client.
* @return the created client instance
* @throws PulsarClientException if an error occurs creating the client
* @throws PulsarException if an error occurs creating the client
*/
PulsarClient createClient() throws PulsarClientException;
PulsarClient createClient();

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.client.api.Schema;

import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;

/**
* Pulsar consumer factory interface.
Expand All @@ -34,6 +35,7 @@
* @author Soby Chacko
* @author Christophe Bornet
* @author Chris Bono
* @author Jonas Geiregat
*/
public interface PulsarConsumerFactory<T> {

Expand All @@ -53,10 +55,11 @@ public interface PulsarConsumerFactory<T> {
* that the customizer is applied last and has the potential for overriding any
* specified parameters or default properties.
* @return the consumer
* @throws PulsarClientException if any error occurs
* @throws PulsarException if any {@link PulsarClientException} occurs communicating
* with Pulsar
*/
Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics, @Nullable String subscriptionName,
ConsumerBuilderCustomizer<T> customizer) throws PulsarClientException;
ConsumerBuilderCustomizer<T> customizer);

/**
* Create a consumer.
Expand All @@ -79,10 +82,10 @@ Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics
* builder. Note that the customizers are applied last and have the potential for
* overriding any specified parameters or default properties.
* @return the consumer
* @throws PulsarClientException if any error occurs
* @throws PulsarException if any {@link PulsarClientException} occurs communicating
* with Pulsar
*/
Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics, @Nullable String subscriptionName,
@Nullable Map<String, String> metadataProperties, @Nullable List<ConsumerBuilderCustomizer<T>> customizers)
throws PulsarClientException;
@Nullable Map<String, String> metadataProperties, @Nullable List<ConsumerBuilderCustomizer<T>> customizers);

}
Loading

0 comments on commit 1126c46

Please sign in to comment.