From ad9d4418c458b71d2f65db95757f076746da1da2 Mon Sep 17 00:00:00 2001 From: royalhuang Date: Thu, 22 Sep 2022 22:31:42 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=BC=95=E6=93=8E=E7=AD=89MQ=E5=9C=BA?= =?UTF-8?q?=E6=99=AF=E6=8E=A5=E5=85=A5SCS=E6=A1=86=E6=9E=B6=20#7443?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../StreamBindingEnvironmentPostProcessor.kt | 36 ++++++------------- .../devops/common/stream/pojo/IEvent.kt | 1 + .../devops/common/stream/pojo/SampleEvent.kt | 36 ------------------- ...ingHandlerMappingsProviderConfiguration.kt | 6 ++++ 4 files changed, 17 insertions(+), 62 deletions(-) delete mode 100644 src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/SampleEvent.kt diff --git a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/StreamBindingEnvironmentPostProcessor.kt b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/StreamBindingEnvironmentPostProcessor.kt index 87519367214..2de1f87abf1 100644 --- a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/StreamBindingEnvironmentPostProcessor.kt +++ b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/StreamBindingEnvironmentPostProcessor.kt @@ -34,7 +34,6 @@ import org.reflections.scanners.Scanners import org.reflections.util.ClasspathHelper import org.reflections.util.ConfigurationBuilder import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Value import org.springframework.boot.SpringApplication import org.springframework.boot.context.config.ConfigDataEnvironmentPostProcessor import org.springframework.boot.env.EnvironmentPostProcessor @@ -43,18 +42,8 @@ import org.springframework.core.env.ConfigurableEnvironment import org.springframework.core.env.PropertiesPropertySource import java.util.Properties -// TODO #7443 class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered { - @Value("\${spring.cloud.stream.default-binder:#{null}}") - private val defaultBinder: String? = null - - @Value("\${spring.cloud.stream.service-binder:#{null}}") - private val serviceBinder: String? = null - - @Value("\${spring.application.name:#{null}}") - private val serviceName: String? = null - override fun postProcessEnvironment(environment: ConfigurableEnvironment, application: SpringApplication) { environment.propertySources.addLast(createPropertySource()) } @@ -63,7 +52,7 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered with(Properties()) { // 如果未配置服务使用的binder类型,则使用全局默认binder类型 // 如果均未配置则不进行注解的反射解析 - val binder = serviceBinder ?: defaultBinder ?: return PropertiesPropertySource(STREAM_SOURCE_NAME, this) + val binder = "\${spring.cloud.stream.service-binder}" val definition = mutableListOf() val eventClasses = Reflections( ConfigurationBuilder() @@ -77,8 +66,9 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered "with destination[${streamEvent.destination}]" ) definition.add(clazz.simpleName) - setProperty("spring.cloud.stream.bindings.${clazz.simpleName}.destination", streamEvent.destination) - setProperty("spring.cloud.stream.bindings.${clazz.simpleName}.destination", binder) + val prefix = "spring.cloud.stream.bindings.${clazz.simpleName}" + setProperty("$prefix.destination", streamEvent.destination) + setProperty("$prefix.binder", binder) } val consumerBeans = Reflections( ConfigurationBuilder() @@ -95,17 +85,11 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered definition.add(method.name) // 如果注解中指定了订阅组,则直接设置 // 如果未指定则取当前服务名作为订阅组,保证所有分布式服务再同一个组内 - val subscriptionGroup = streamConsumer.group.ifBlank { - serviceName ?: "default" - } - setProperty( - "spring.cloud.stream.bindings.${method.name}-in-0", - streamConsumer.destination - ) - setProperty( - "spring.cloud.stream.bindings.bindings.${method.name}-in-0", - subscriptionGroup - ) + val subscriptionGroup = "\${spring.application.name}" + val prefix = "spring.cloud.stream.bindings.${method.name}-in-0" + setProperty("$prefix.destination", streamConsumer.destination) + setProperty("$prefix.group", subscriptionGroup) + setProperty("$prefix.binder", binder) } setProperty("spring.cloud.stream.function.definition", definition.joinToString(";")) return PropertiesPropertySource(STREAM_SOURCE_NAME, this) @@ -113,7 +97,7 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered } override fun getOrder(): Int { - return ConfigDataEnvironmentPostProcessor.ORDER - 1 + return ConfigDataEnvironmentPostProcessor.ORDER + 1 } companion object { diff --git a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/IEvent.kt b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/IEvent.kt index 354723c8832..24e65fb445d 100644 --- a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/IEvent.kt +++ b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/IEvent.kt @@ -49,6 +49,7 @@ open class IEvent( fun sendTo(bridge: StreamBridge) { try { val eventType = this::class.java.annotations.find { s -> s is StreamEvent } as StreamEvent + // TODO #7443 bridge.send(eventType.destination, buildMessage(eventType.delayMills)) } catch (ignored: Exception) { logger.error("[STREAM MQ] Fail to dispatch the event($this)", ignored) diff --git a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/SampleEvent.kt b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/SampleEvent.kt deleted file mode 100644 index 3bf94872d44..00000000000 --- a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/SampleEvent.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. - * - * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. - * - * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. - * - * A copy of the MIT License is included in this file. - * - * - * Terms of the MIT License: - * --------------------------------------------------- - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated - * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the - * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to - * permit persons to whom the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial portions of - * the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT - * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN - * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE - * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -package com.tencent.devops.common.stream.pojo - -import com.tencent.devops.common.stream.annotation.StreamEvent -@StreamEvent("a.b.c") -data class SampleEvent( - val buildId: String, - var retryTime: Int = 2, - var delayMills: Int = 0 -) diff --git a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pulsar/config/ExtendedBindingHandlerMappingsProviderConfiguration.kt b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pulsar/config/ExtendedBindingHandlerMappingsProviderConfiguration.kt index 6e1f9cfa808..b0038a9f62a 100644 --- a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pulsar/config/ExtendedBindingHandlerMappingsProviderConfiguration.kt +++ b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pulsar/config/ExtendedBindingHandlerMappingsProviderConfiguration.kt @@ -27,6 +27,7 @@ package com.tencent.devops.common.stream.pulsar.config +import com.tencent.devops.common.stream.config.StreamBindingEnvironmentPostProcessor import com.tencent.devops.common.stream.pulsar.convert.PulsarMessageConverter import com.tencent.devops.common.stream.pulsar.custom.PulsarConfigBeanPostProcessor import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean @@ -53,6 +54,11 @@ class ExtendedBindingHandlerMappingsProviderConfiguration { return PulsarConfigBeanPostProcessor() } + @Bean + fun streamBindingEnvironmentPostProcessor(): StreamBindingEnvironmentPostProcessor { + return StreamBindingEnvironmentPostProcessor() + } + /** * if you want to customize a bean, please use this BeanName `PulsarMessageConverter.DEFAULT_NAME`. */