Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQS Async Client support #242

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.amazon.sqs.javamessaging;

import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.QueueConnectionFactory;

public abstract class AbstractSQSConnectionFactory implements ConnectionFactory, QueueConnectionFactory {

@Override
public JMSContext createContext() {
throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
}

@Override
public JMSContext createContext(String userName, String password) {
throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
}

@Override
public JMSContext createContext(String userName, String password, int sessionMode) {
throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
}

@Override
public JMSContext createContext(int sessionMode) {
throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.amazon.sqs.javamessaging;

import jakarta.jms.JMSException;
import software.amazon.awssdk.awscore.AwsClient;
import software.amazon.awssdk.services.sqs.model.*;

public interface AmazonSQSMessagingClient {

AwsClient getAmazonSQSClient();

void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSException;

DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException;

SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws JMSException;

boolean queueExists(String queueName) throws JMSException;

boolean queueExists(String queueName, String queueOwnerAccountId) throws JMSException;

GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException;

GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccountId) throws JMSException;

GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws JMSException;

CreateQueueResponse createQueue(String queueName) throws JMSException;

CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) throws JMSException;

ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws JMSException;

void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException;

ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch(
ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws JMSException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* <code>AwsServiceException</code> and <code>SdkException</code> into
* JMSException/JMSSecurityException.
*/
public class AmazonSQSMessagingClientWrapper {
public class AmazonSQSMessagingClientWrapper implements AmazonSQSMessagingClient {
private static final Logger LOG = LoggerFactory.getLogger(AmazonSQSMessagingClientWrapper.class);

/**
Expand Down Expand Up @@ -90,6 +90,7 @@ public AmazonSQSMessagingClientWrapper(SqsClient amazonSQSClient, AwsCredentials
*
* @return amazonSQSClient
*/
@Override
public SqsClient getAmazonSQSClient() {
return amazonSQSClient;
}
Expand All @@ -102,6 +103,7 @@ public SqsClient getAmazonSQSClient() {
* deleteMessage service method on SqsClient.
* @throws JMSException
*/
@Override
public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSException {
try {
amazonSQSClient.deleteMessage(prepareRequest(deleteMessageRequest));
Expand All @@ -123,6 +125,7 @@ public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSE
* returned by SqsClient
* @throws JMSException
*/
@Override
public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException {
try {
return amazonSQSClient.deleteMessageBatch(prepareRequest(deleteMessageBatchRequest));
Expand All @@ -141,6 +144,7 @@ public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest d
* SqsClient
* @throws JMSException
*/
@Override
public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws JMSException {
try {
return amazonSQSClient.sendMessage(prepareRequest(sendMessageRequest));
Expand All @@ -158,6 +162,7 @@ public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) th
* @return true if the queue exists, false if it doesn't.
* @throws JMSException
*/
@Override
public boolean queueExists(String queueName) throws JMSException {
try {
GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(queueName).build();
Expand All @@ -181,6 +186,7 @@ public boolean queueExists(String queueName) throws JMSException {
* @return true if the queue exists, false if it doesn't.
* @throws JMSException
*/
@Override
public boolean queueExists(String queueName, String queueOwnerAccountId) throws JMSException {
try {
GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder()
Expand All @@ -204,6 +210,7 @@ public boolean queueExists(String queueName, String queueOwnerAccountId) throws
* SqsClient, which will include queue`s URL
* @throws JMSException
*/
@Override
public GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException {
GetQueueUrlRequest request = GetQueueUrlRequest.builder()
.queueName(queueName)
Expand All @@ -220,6 +227,7 @@ public GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException {
* SqsClient, which will include queue`s URL
* @throws JMSException
*/
@Override
public GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccountId) throws JMSException {
GetQueueUrlRequest request = GetQueueUrlRequest.builder()
.queueName(queueName)
Expand All @@ -237,6 +245,7 @@ public GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccoun
* SqsClient, which will include queue`s URL
* @throws JMSException
*/
@Override
public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws JMSException {
try {
return amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest));
Expand All @@ -255,6 +264,7 @@ public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) th
* an existing one.
* @throws JMSException
*/
@Override
public CreateQueueResponse createQueue(String queueName) throws JMSException {
return createQueue(CreateQueueRequest.builder().queueName(queueName).build());
}
Expand All @@ -270,6 +280,7 @@ public CreateQueueResponse createQueue(String queueName) throws JMSException {
* an existing one.
* @throws JMSException
*/
@Override
public CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) throws JMSException {
try {
return amazonSQSClient.createQueue(prepareRequest(createQueueRequest));
Expand All @@ -290,6 +301,7 @@ public CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) th
* by SqsClient.
* @throws JMSException
*/
@Override
public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws JMSException {
try {
return amazonSQSClient.receiveMessage(prepareRequest(receiveMessageRequest));
Expand All @@ -306,6 +318,7 @@ public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessag
* changeMessageVisibility service method on SqsClient.
* @throws JMSException
*/
@Override
public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException {
try {
amazonSQSClient.changeMessageVisibility(prepareRequest(changeMessageVisibilityRequest));
Expand All @@ -325,6 +338,7 @@ public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessage
* method, as returned by SqsClient.
* @throws JMSException
*/
@Override
public ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch(
ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws JMSException {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.amazon.sqs.javamessaging;

import jakarta.jms.JMSException;
import jakarta.jms.QueueConnection;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;

import java.util.function.Supplier;

public class SQSAsyncConnectionFactory extends AbstractSQSConnectionFactory {
private final ProviderConfiguration providerConfiguration;
private final Supplier<SqsAsyncClient> amazonSQSClientSupplier;

/**
* Creates a SQSConnectionFactory that uses default ProviderConfiguration
* and SqsClientAsyncBuilder.standard() for creating SqsAsyncClient connections.
* Every SQSConnection will have its own copy of SqsAsyncClient.
*/
public SQSAsyncConnectionFactory() {
this(new ProviderConfiguration());
}

/**
* Creates a SQSConnectionFactory that uses SqsAsyncClientBuilder.standard() for creating SqsAsyncClient connections.
* Every SQSConnection will have its own copy of SqsAsyncClient.
*/
public SQSAsyncConnectionFactory(ProviderConfiguration providerConfiguration) {
this(providerConfiguration, SqsAsyncClient.create());
}

/**
* Creates a SQSConnectionFactory that uses the provided SqsAsyncClient connection.
* Every SQSConnection will use the same provided SqsAsyncClient.
*/
public SQSAsyncConnectionFactory(ProviderConfiguration providerConfiguration, final SqsAsyncClient client) {
if (providerConfiguration == null) {
throw new IllegalArgumentException("Provider configuration cannot be null");
}
if (client == null) {
throw new IllegalArgumentException("AmazonSQS client cannot be null");
}
this.providerConfiguration = providerConfiguration;
this.amazonSQSClientSupplier = () -> client;
}

/**
* Creates a SQSConnectionFactory that uses the provided SqsClientBuilder for creating AmazonSQS client connections.
* Every SQSConnection will have its own copy of AmazonSQS client created through the provided builder.
*/
public SQSAsyncConnectionFactory(ProviderConfiguration providerConfiguration, final SqsAsyncClientBuilder clientBuilder) {
if (providerConfiguration == null) {
throw new IllegalArgumentException("Provider configuration cannot be null");
}
if (clientBuilder == null) {
throw new IllegalArgumentException("AmazonSQS client builder cannot be null");
}
this.providerConfiguration = providerConfiguration;
this.amazonSQSClientSupplier = clientBuilder::build;
}


@Override
public SQSConnection createConnection() throws JMSException {
try {
SqsAsyncClient amazonSQS = amazonSQSClientSupplier.get();
return createConnection(amazonSQS, null);
} catch (RuntimeException e) {
throw (JMSException) new JMSException("Error creating SQS client: " + e.getMessage()).initCause(e);
}
}

@Override
public SQSConnection createConnection(String awsAccessKeyId, String awsSecretKey) throws JMSException {
AwsBasicCredentials basicAWSCredentials = AwsBasicCredentials.create(awsAccessKeyId, awsSecretKey);
return createConnection(basicAWSCredentials);
}

public SQSConnection createConnection(AwsCredentials awsCredentials) throws JMSException {
AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider.create(awsCredentials);
return createConnection(awsCredentialsProvider);
}

public SQSConnection createConnection(AwsCredentialsProvider awsCredentialsProvider) throws JMSException {
try {
SqsAsyncClient amazonSQS = amazonSQSClientSupplier.get();
return createConnection(amazonSQS, awsCredentialsProvider);
} catch (Exception e) {
throw (JMSException) new JMSException("Error creating SQS client: " + e.getMessage()).initCause(e);
}
}

private SQSConnection createConnection(SqsAsyncClient amazonSQS, AwsCredentialsProvider awsCredentialsProvider) throws JMSException {
AmazonSQSAsyncMessagingClientWrapper amazonSQSClientJMSWrapper = new AmazonSQSAsyncMessagingClientWrapper(amazonSQS, awsCredentialsProvider);
return new SQSConnection(amazonSQSClientJMSWrapper, providerConfiguration.getNumberOfMessagesToPrefetch());
}

@Override
public QueueConnection createQueueConnection() throws JMSException {
return createConnection();
}

@Override
public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
return createConnection(userName, password);
}
}
12 changes: 6 additions & 6 deletions src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import jakarta.jms.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.awscore.AwsClient;

import java.util.Collections;
import java.util.Set;
Expand Down Expand Up @@ -84,7 +84,7 @@ public class SQSConnection implements Connection, QueueConnection {
/** Used for interactions with connection state. */
private final Object stateLock = new Object();

private final AmazonSQSMessagingClientWrapper amazonSQSClient;
private final AmazonSQSMessagingClient amazonSQSClient;

/**
* Configures the amount of messages that can be prefetched by a consumer. A
Expand All @@ -106,7 +106,7 @@ public class SQSConnection implements Connection, QueueConnection {

private final Set<Session> sessions = Collections.newSetFromMap(new ConcurrentHashMap<>());

SQSConnection(AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper, int numberOfMessagesToPrefetch) {
SQSConnection(AmazonSQSMessagingClient amazonSQSClientJMSWrapper, int numberOfMessagesToPrefetch) {
amazonSQSClient = amazonSQSClientJMSWrapper;
this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch;

Expand All @@ -116,9 +116,9 @@ public class SQSConnection implements Connection, QueueConnection {
* Get the AmazonSQSClient used by this connection. This can be used to do administrative operations
* that aren't included in the JMS specification, e.g. creating new queues.
*
* @return the SqsClient used by this connection
* @return the AwsClient used by this connection
*/
public SqsClient getAmazonSQSClient() {
public AwsClient getAmazonSQSClient() {
return amazonSQSClient.getAmazonSQSClient();
}

Expand All @@ -130,7 +130,7 @@ public SqsClient getAmazonSQSClient() {
*
* @return wrapped version of the AmazonSQSClient used by this connection
*/
public AmazonSQSMessagingClientWrapper getWrappedAmazonSQSClient() {
public AmazonSQSMessagingClient getWrappedAmazonSQSClient() {
return amazonSQSClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@
*/
package com.amazon.sqs.javamessaging;

import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.QueueConnection;
import jakarta.jms.QueueConnectionFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand Down Expand Up @@ -48,7 +44,7 @@
* methods.
*/

public class SQSConnectionFactory implements ConnectionFactory, QueueConnectionFactory {
public class SQSConnectionFactory extends AbstractSQSConnectionFactory {
private final ProviderConfiguration providerConfiguration;
private final Supplier<SqsClient> amazonSQSClientSupplier;

Expand Down Expand Up @@ -116,26 +112,6 @@ public SQSConnection createConnection(String awsAccessKeyId, String awsSecretKey
return createConnection(basicAWSCredentials);
}

@Override
public JMSContext createContext() {
throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
}

@Override
public JMSContext createContext(String userName, String password) {
throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
}

@Override
public JMSContext createContext(String userName, String password, int sessionMode) {
throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
}

@Override
public JMSContext createContext(int sessionMode) {
throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
}

public SQSConnection createConnection(AwsCredentials awsCredentials) throws JMSException {
AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider.create(awsCredentials);
return createConnection(awsCredentialsProvider);
Expand Down
Loading