From 2a63147db1412895240bd84a332f1dbbcbdfd694 Mon Sep 17 00:00:00 2001 From: Xiaolu Dai Date: Tue, 19 Oct 2021 22:46:33 +0800 Subject: [PATCH 1/2] fix the configuration of eventhub processor client --- sdk/spring/pom.xml | 1 + .../context/AzureContextUtils.java | 6 + ...AzureBlobCheckpointStoreConfiguration.java | 23 +++- .../AzureStorageBlobAutoConfiguration.java | 10 +- ...eBlobCheckpointStoreConfigurationTest.java | 106 ++++++++++++++++++ .../pom.xml | 1 - 6 files changed, 140 insertions(+), 7 deletions(-) create mode 100644 sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfigurationTest.java diff --git a/sdk/spring/pom.xml b/sdk/spring/pom.xml index 43d8455f6d5f8..774930834bf93 100644 --- a/sdk/spring/pom.xml +++ b/sdk/spring/pom.xml @@ -204,6 +204,7 @@ ../../eng/code-quality-reports ../keyvault/azure-security-keyvault-jca + azure-spring-boot-test-core spring-messaging-azure spring-messaging-azure-eventhubs spring-messaging-azure-servicebus diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/context/AzureContextUtils.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/context/AzureContextUtils.java index 4e227c614dcf6..8386bf5470494 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/context/AzureContextUtils.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/context/AzureContextUtils.java @@ -39,4 +39,10 @@ public abstract class AzureContextUtils { public static final String AZURE_GLOBAL_PROPERTY_BEAN_NAME = "AZURE_GLOBAL_PROPERTY_BEAN_NAME"; + public static final String EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME = + "EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME"; + + public static final String STORAGE_BLOB_CLIENT_BUILDER_FACTORY_BEAN_NAME = + "STORAGE_BLOB_CLIENT_BUILDER_FACTORY_BEAN_NAME"; + } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfiguration.java index db2313222b725..0a88d7f564f26 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfiguration.java @@ -7,33 +7,50 @@ import com.azure.spring.cloud.autoconfigure.eventhubs.properties.AzureEventHubProperties; import com.azure.spring.cloud.autoconfigure.storage.blob.BlobServiceClientBuilderFactory; import com.azure.storage.blob.BlobContainerAsyncClient; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.time.Duration; + +import static com.azure.spring.cloud.autoconfigure.context.AzureContextUtils.EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME; + /** * Configures a {@link BlobCheckpointStore} */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(BlobCheckpointStore.class) +@ConditionalOnProperty(prefix = "spring.cloud.azure.eventhubs.processor.checkpoint-store", name = { "container-name", "account-name" }) public class AzureBlobCheckpointStoreConfiguration { @Bean @ConditionalOnMissingBean - @ConditionalOnProperty("spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name") - public BlobCheckpointStore blobCheckpointStore(AzureEventHubProperties eventHubProperties) { + public BlobCheckpointStore blobCheckpointStore(@Qualifier(EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME) + BlobServiceClientBuilderFactory blobServiceClientBuilderFactory, + AzureEventHubProperties eventHubProperties) { final AzureEventHubProperties.Processor.BlobCheckpointStore checkpointStoreProperties = eventHubProperties .getProcessor() .getCheckpointStore(); - final BlobContainerAsyncClient blobContainerAsyncClient = new BlobServiceClientBuilderFactory(checkpointStoreProperties) + final BlobContainerAsyncClient blobContainerAsyncClient = blobServiceClientBuilderFactory .build() .buildAsyncClient() .getBlobContainerAsyncClient(checkpointStoreProperties.getContainerName()); + if (Boolean.FALSE.equals(blobContainerAsyncClient.exists().block(Duration.ofSeconds(3)))) { + blobContainerAsyncClient.create().block(Duration.ofSeconds(3)); + } + return new BlobCheckpointStore(blobContainerAsyncClient); } + @Bean(EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME) + @ConditionalOnMissingBean(name = EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME) + public BlobServiceClientBuilderFactory eventHubProcessorBlobServiceClientBuilderFactory(AzureEventHubProperties eventHubProperties) { + return new BlobServiceClientBuilderFactory(eventHubProperties.getProcessor().getCheckpointStore()); + } + } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/storage/blob/AzureStorageBlobAutoConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/storage/blob/AzureStorageBlobAutoConfiguration.java index 7f0210b9e4b5d..e7c0ee6a2f58c 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/storage/blob/AzureStorageBlobAutoConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/storage/blob/AzureStorageBlobAutoConfiguration.java @@ -13,12 +13,15 @@ import com.azure.storage.blob.BlobServiceAsyncClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; +import static com.azure.spring.cloud.autoconfigure.context.AzureContextUtils.STORAGE_BLOB_CLIENT_BUILDER_FACTORY_BEAN_NAME; + /** * Auto-configuration for a {@link BlobServiceClientBuilder} and blob service clients. */ @@ -81,15 +84,16 @@ public BlobServiceClient blobServiceClient(BlobServiceClientBuilder builder) { return builder.buildClient(); } - @Bean - @ConditionalOnMissingBean + @Bean(STORAGE_BLOB_CLIENT_BUILDER_FACTORY_BEAN_NAME) + @ConditionalOnMissingBean(name = STORAGE_BLOB_CLIENT_BUILDER_FACTORY_BEAN_NAME) public BlobServiceClientBuilderFactory blobServiceClientBuilderFactory(AzureStorageBlobProperties properties) { return new BlobServiceClientBuilderFactory(properties); } @Bean @ConditionalOnMissingBean - public BlobServiceClientBuilder blobServiceClientBuilder(BlobServiceClientBuilderFactory factory) { + public BlobServiceClientBuilder blobServiceClientBuilder(@Qualifier(STORAGE_BLOB_CLIENT_BUILDER_FACTORY_BEAN_NAME) + BlobServiceClientBuilderFactory factory) { return factory.build(); } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfigurationTest.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfigurationTest.java new file mode 100644 index 0000000000000..c96ff703df610 --- /dev/null +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfigurationTest.java @@ -0,0 +1,106 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.spring.cloud.autoconfigure.eventhubs; + +import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; +import com.azure.spring.cloud.autoconfigure.context.AzureContextUtils; +import com.azure.spring.cloud.autoconfigure.eventhubs.properties.AzureEventHubProperties; +import com.azure.spring.cloud.autoconfigure.properties.AzureGlobalProperties; +import com.azure.spring.cloud.autoconfigure.storage.blob.AzureStorageBlobAutoConfiguration; +import com.azure.spring.cloud.autoconfigure.storage.blob.BlobServiceClientBuilderFactory; +import com.azure.storage.blob.BlobContainerAsyncClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import org.assertj.core.api.Condition; +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.FilteredClassLoader; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +class AzureBlobCheckpointStoreConfigurationTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(AzureBlobCheckpointStoreConfiguration.class)); + + + @Test + void configureWithoutBlobCheckpointStore() { + this.contextRunner + .withClassLoader(new FilteredClassLoader(BlobCheckpointStore.class)) + .withPropertyValues( + "spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=abc", + "spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=sa" + ) + .run(context -> assertThat(context).doesNotHaveBean(AzureBlobCheckpointStoreConfiguration.class)); + } + + @Test + void configureWithoutContainerName() { + this.contextRunner + .withPropertyValues("spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=sa") + .run(context -> assertThat(context).doesNotHaveBean(AzureBlobCheckpointStoreConfiguration.class)); + } + + + @Test + void configureWithoutAccountName() { + this.contextRunner + .withPropertyValues("spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=abc") + .run(context -> assertThat(context).doesNotHaveBean(AzureBlobCheckpointStoreConfiguration.class)); + } + + @Test + void configureWithStorageInfo() { + AzureEventHubProperties azureEventHubProperties = new AzureEventHubProperties(); + azureEventHubProperties.getProcessor().getCheckpointStore().setAccountName("sa"); + azureEventHubProperties.getProcessor().getCheckpointStore().setContainerName("abc"); + this.contextRunner + .withPropertyValues( + "spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=abc", + "spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=sa" + ) + .withBean(AzureEventHubProperties.class, () -> azureEventHubProperties) + .withBean(BlobCheckpointStore.class, () -> mock(BlobCheckpointStore.class)) + .run(context -> { + assertThat(context).hasSingleBean(AzureBlobCheckpointStoreConfiguration.class); + assertThat(context).hasSingleBean(BlobServiceClientBuilderFactory.class); + assertThat(context).hasBean(AzureContextUtils.EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME); + }); + } + + @Test + void shouldWorkWithStorageClientConfiguration() { + AzureEventHubProperties azureEventHubProperties = new AzureEventHubProperties(); + azureEventHubProperties.getProcessor().getCheckpointStore().setAccountName("sa"); + azureEventHubProperties.getProcessor().getCheckpointStore().setContainerName("abc"); + + this.contextRunner + .withUserConfiguration(AzureStorageBlobAutoConfiguration.class) + .withPropertyValues( + "spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=abc", + "spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=sa", + "spring.cloud.azure.storage.blob.account-name=sa", + "spring.cloud.azure.storage.blob.container-name=abc" + ) + .withBean(AzureEventHubProperties.class, () -> azureEventHubProperties) + .withBean(AzureGlobalProperties.class, AzureGlobalProperties::new) + .withBean(BlobCheckpointStore.class, () -> mock(BlobCheckpointStore.class)) + .run(context -> { + assertThat(context).hasSingleBean(AzureBlobCheckpointStoreConfiguration.class); + assertThat(context).hasSingleBean(AzureStorageBlobAutoConfiguration.class); + assertThat(context).hasBean(AzureContextUtils.EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME); + assertThat(context).hasBean(AzureContextUtils.STORAGE_BLOB_CLIENT_BUILDER_FACTORY_BEAN_NAME); + + assertThat(context).hasSingleBean(BlobServiceClientBuilder.class); + assertThat(context).hasSingleBean(BlobContainerAsyncClient.class); + + assertThat(context).has(new Condition<>(c -> { + String[] beanNamesForType = c.getBeanNamesForType(BlobServiceClientBuilderFactory.class); + return beanNamesForType.length == 2; + }, "There should be two beans of type BlobServiceClientBuilderFactory")); + }); + } +} diff --git a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/pom.xml b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/pom.xml index 17a447110ab69..c61eb83202e93 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/pom.xml +++ b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/pom.xml @@ -39,7 +39,6 @@ com.azure azure-messaging-eventhubs-checkpointstore-blob 1.10.1 - true org.springframework.boot From 808ab61bb89c3cf80d2cc948e581fa6c7545f405 Mon Sep 17 00:00:00 2001 From: Xiaolu Dai Date: Wed, 20 Oct 2021 11:53:26 +0800 Subject: [PATCH 2/2] use BlobCheckpointStoreContainerInitializer interface to allow configuring the initialization of blob container --- ...AzureBlobCheckpointStoreConfiguration.java | 24 +++++++++++++++---- ...obCheckpointStoreContainerInitializer.java | 16 +++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) create mode 100644 sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/BlobCheckpointStoreContainerInitializer.java diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfiguration.java index 0a88d7f564f26..66b214084472a 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/AzureBlobCheckpointStoreConfiguration.java @@ -7,6 +7,9 @@ import com.azure.spring.cloud.autoconfigure.eventhubs.properties.AzureEventHubProperties; import com.azure.spring.cloud.autoconfigure.storage.blob.BlobServiceClientBuilderFactory; import com.azure.storage.blob.BlobContainerAsyncClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -26,11 +29,14 @@ @ConditionalOnProperty(prefix = "spring.cloud.azure.eventhubs.processor.checkpoint-store", name = { "container-name", "account-name" }) public class AzureBlobCheckpointStoreConfiguration { + private static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobCheckpointStoreConfiguration.class); + @Bean @ConditionalOnMissingBean public BlobCheckpointStore blobCheckpointStore(@Qualifier(EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME) BlobServiceClientBuilderFactory blobServiceClientBuilderFactory, - AzureEventHubProperties eventHubProperties) { + AzureEventHubProperties eventHubProperties, + ObjectProvider initializers) { final AzureEventHubProperties.Processor.BlobCheckpointStore checkpointStoreProperties = eventHubProperties .getProcessor() .getCheckpointStore(); @@ -40,13 +46,23 @@ public BlobCheckpointStore blobCheckpointStore(@Qualifier(EVENT_HUB_PROCESSOR_CH .buildAsyncClient() .getBlobContainerAsyncClient(checkpointStoreProperties.getContainerName()); - if (Boolean.FALSE.equals(blobContainerAsyncClient.exists().block(Duration.ofSeconds(3)))) { - blobContainerAsyncClient.create().block(Duration.ofSeconds(3)); - } + initializers.ifAvailable(initializer -> initializer.init(blobContainerAsyncClient)); return new BlobCheckpointStore(blobContainerAsyncClient); } + @Bean + @ConditionalOnMissingBean + public BlobCheckpointStoreContainerInitializer blobCheckpointStoreContainerInitializer() { + return containerAsyncClient -> { + if (Boolean.FALSE.equals(containerAsyncClient.exists().block(Duration.ofSeconds(3)))) { + LOGGER.debug("The checkpoint store container [{}] doesn't exist, will create the blob container now.", + containerAsyncClient.getBlobContainerName()); + containerAsyncClient.create().block(Duration.ofSeconds(3)); + } + }; + } + @Bean(EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME) @ConditionalOnMissingBean(name = EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME) public BlobServiceClientBuilderFactory eventHubProcessorBlobServiceClientBuilderFactory(AzureEventHubProperties eventHubProperties) { diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/BlobCheckpointStoreContainerInitializer.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/BlobCheckpointStoreContainerInitializer.java new file mode 100644 index 0000000000000..e4a3d38b2291e --- /dev/null +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/eventhubs/BlobCheckpointStoreContainerInitializer.java @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.spring.cloud.autoconfigure.eventhubs; + +import com.azure.storage.blob.BlobContainerAsyncClient; + +/** + * Interface to be implemented in order to configure the BlobCheckpointStore's container programmatically. + * + */ +public interface BlobCheckpointStoreContainerInitializer { + + void init(BlobContainerAsyncClient containerAsyncClient); + +}