diff --git a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/processor/StreamBindingEnvironmentPostProcessor.kt b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/processor/StreamBindingEnvironmentPostProcessor.kt index 01876eceb46..457b58b05ec 100644 --- a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/processor/StreamBindingEnvironmentPostProcessor.kt +++ b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/processor/StreamBindingEnvironmentPostProcessor.kt @@ -94,10 +94,7 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered // 反射扫描所有带有 StreamConsumer 注解的bean方法 consumerMethodBeans.forEach { method -> val consumer = method.getAnnotation(StreamEventConsumer::class.java) - val bindingName = DefaultBindingUtils.getInBindingName( - method = method, - suffix = if (consumer.separately) hostName else null - ) + val bindingName = DefaultBindingUtils.getInBindingName(method) logger.info( "Found StreamConsumer method: ${method.name}, bindingName[$bindingName], " + "with destination[${consumer.destination}], group[${consumer.group}]" @@ -105,7 +102,7 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered definition.add(bindingName) // 如果注解中指定了订阅组,则直接设置 // 如果未指定则取当前服务名作为订阅组,保证所有分布式服务再同一个组内 - setBindings(bindingName, consumer) + setBindings(bindingName, consumer, hostName) } // 反射扫描所有带有 StreamConsumer 注解的bean类型 @@ -117,10 +114,7 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered ).getTypesAnnotatedWith(StreamEventConsumer::class.java) consumerClassBeans.forEach { clazz -> val consumer = clazz.getAnnotation(StreamEventConsumer::class.java) - val bindingName = DefaultBindingUtils.getInBindingName( - clazz = clazz, - suffix = if (consumer.separately) hostName else null - ) + val bindingName = DefaultBindingUtils.getInBindingName(clazz) logger.info( "Found StreamConsumer class: ${clazz.name}, bindingName[$bindingName], " + "with destination[${consumer.destination}], " + @@ -129,7 +123,7 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered definition.add(bindingName) // 如果注解中指定了订阅组,则直接设置 // 如果未指定则取当前服务名作为订阅组,保证所有分布式服务再同一个组内 - setBindings(bindingName, consumer) + setBindings(bindingName, consumer, hostName) } // 声明所有扫描结果的函数式声明 @@ -138,15 +132,19 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered } } - private fun Properties.setBindings(bindingName: String, streamEventConsumer: StreamEventConsumer) { + private fun Properties.setBindings( + bindingName: String, + consumer: StreamEventConsumer, + hostName: String + ) { val bindingPrefix = "spring.cloud.stream.bindings.$bindingName-in-0" val rabbitPropPrefix = "spring.cloud.stream.rabbit.bindings.$bindingName-in-0" - setProperty("$bindingPrefix.destination", streamEventConsumer.destination) - setProperty("$bindingPrefix.group", streamEventConsumer.group) + setProperty("$bindingPrefix.destination", consumer.destination) + setProperty("$bindingPrefix.group", if (consumer.separately) hostName else consumer.group) setProperty("$rabbitPropPrefix.consumer.delayedExchange", "true") setProperty( "$rabbitPropPrefix.consumer.consumerTagPrefix", - "\${spring.application.name}-${streamEventConsumer.group}-\${spring.cloud.stream.instance-index}" + "\${spring.application.name}-${consumer.group}-\${spring.cloud.stream.instance-index}" ) } diff --git a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/utils/DefaultBindingUtils.kt b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/utils/DefaultBindingUtils.kt index a3a308450b6..e2dc3fc7462 100644 --- a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/utils/DefaultBindingUtils.kt +++ b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/utils/DefaultBindingUtils.kt @@ -1,25 +1,12 @@ package com.tencent.devops.common.stream.utils -import com.tencent.devops.common.api.util.UUIDUtil import java.lang.reflect.Method object DefaultBindingUtils { fun getOutBindingName(clazz: Class<*>) = clazz.simpleName.decapitalize() - fun getInBindingName(method: Method, suffix: String?): String { - return if (suffix.isNullOrBlank()) { - method.name - } else { - "${method.name}-$suffix" - } - } + fun getInBindingName(method: Method): String = method.name - fun getInBindingName(clazz: Class<*>, suffix: String?): String { - return if (suffix.isNullOrBlank()) { - clazz.simpleName.decapitalize() - } else { - "${clazz.simpleName.decapitalize()}-$suffix" - } - } + fun getInBindingName(clazz: Class<*>) = clazz.simpleName.decapitalize() }