From 13022d785b366a5e2729bc8132d0b380d514173b Mon Sep 17 00:00:00 2001 From: royalhuang Date: Wed, 14 Sep 2022 15:27:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=BC=95=E6=93=8E=E7=AD=89MQ=E5=9C=BA?= =?UTF-8?q?=E6=99=AF=E6=8E=A5=E5=85=A5SCS=E6=A1=86=E6=9E=B6=20#7443?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../stream/annotation/StreamConsumer.kt | 4 +- .../common/stream/annotation/StreamEvent.kt | 7 +- .../StreamBindingEnvironmentPostProcessor.kt | 65 +++++++++++-------- .../devops/common/stream/pojo/IEvent.kt | 2 +- .../devops/common/stream/pojo/SampleEvent.kt | 36 ++++++++++ .../pulsar/util/PulsarTopicUtilsTest.kt | 17 +++-- .../log/configuration/LogMQConfiguration.kt | 7 +- .../devops/log/util/IndexNameUtilsTest.kt | 39 ++++++++++- 8 files changed, 134 insertions(+), 43 deletions(-) create mode 100644 src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/SampleEvent.kt diff --git a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/annotation/StreamConsumer.kt b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/annotation/StreamConsumer.kt index 91fa86785b6..1f601deba9a 100644 --- a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/annotation/StreamConsumer.kt +++ b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/annotation/StreamConsumer.kt @@ -31,13 +31,13 @@ import org.springframework.context.annotation.Bean /** * Stream消费者注解 - * @param destination 发送目标绑定,用于指定RabbitMQ的Exchange,Kafka和Pulsar的Topic + * @param streamEvent 目标绑定接受的事件 * @param group 指定订阅组,如果是广播事件则需要指定,否则为非广播默认订阅组 */ @Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION) @Retention(AnnotationRetention.RUNTIME) @Bean annotation class StreamConsumer( - val destination: String, + val streamEvent: StreamEvent, val group: String = "" ) diff --git a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/annotation/StreamEvent.kt b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/annotation/StreamEvent.kt index 1b8b033ae92..b8c59545c3b 100644 --- a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/annotation/StreamEvent.kt +++ b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/annotation/StreamEvent.kt @@ -29,14 +29,13 @@ package com.tencent.devops.common.stream.annotation /** * Stream事件注解 - * @param outBinding 发送目标绑定,用于指定RabbitMQ的Exchange,Kafka和Pulsar的Topic + * @param destination 发送目标绑定,用于指定RabbitMQ的Exchange,Kafka和Pulsar的Topic * @param fanout 是否为广播事件 * @param delayMills 延迟时间,如果为延迟事件则必填 */ -@Target(AnnotationTarget.FILE) +@Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.RUNTIME) annotation class StreamEvent( - val outBinding: String, - val fanout: Boolean = false, + val destination: String, val delayMills: Int = 0 ) diff --git a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/StreamBindingEnvironmentPostProcessor.kt b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/StreamBindingEnvironmentPostProcessor.kt index 744c0556b4b..ce8ace57eda 100644 --- a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/StreamBindingEnvironmentPostProcessor.kt +++ b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/config/StreamBindingEnvironmentPostProcessor.kt @@ -29,6 +29,10 @@ package com.tencent.devops.common.stream.config import com.tencent.devops.common.stream.annotation.StreamConsumer import com.tencent.devops.common.stream.annotation.StreamEvent +import org.reflections.Reflections +import org.reflections.scanners.Scanners +import org.reflections.util.ClasspathHelper +import org.reflections.util.ConfigurationBuilder import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value import org.springframework.boot.SpringApplication @@ -36,12 +40,11 @@ import org.springframework.boot.context.config.ConfigDataEnvironmentPostProcesso import org.springframework.boot.env.EnvironmentPostProcessor import org.springframework.core.Ordered import org.springframework.core.env.ConfigurableEnvironment -import org.springframework.core.env.MapPropertySource import org.springframework.core.env.PropertiesPropertySource import java.util.Properties // TODO #7443 -class StreamBindingEnvironmentPostProcessor: EnvironmentPostProcessor, Ordered { +class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered { @Value("\${spring.cloud.stream.default-binder:#{null}}") private val defaultBinder: String? = null @@ -60,45 +63,53 @@ class StreamBindingEnvironmentPostProcessor: EnvironmentPostProcessor, Ordered { with(Properties()) { // 如果未配置服务使用的binder类型,则使用全局默认binder类型 // 如果均未配置则不进行注解的反射解析 - val binder = serviceBinder ?: defaultBinder ?: return - val ref = Reflections() - val propertySources = applicationContext.environment.propertySources - ref.getTypesAnnotatedWith(StreamEvent::class.java).forEach { clazz -> + val binder = serviceBinder ?: defaultBinder ?: return PropertiesPropertySource(STREAM_SOURCE_NAME, this) + val eventClasses = Reflections( + ConfigurationBuilder() + .addUrls(ClasspathHelper.forPackage("com.tencent.devops")) + .setExpandSuperTypes(true) + ).getTypesAnnotatedWith(StreamEvent::class.java) + eventClasses.forEach { clazz -> val streamEvent = clazz.getAnnotation(StreamEvent::class.java) - logger.info("Found StreamEvent class: ${clazz.name}, " + - "with destination[${streamEvent.destination}]") - propertySources.addLast( - MapPropertySource(streamEvent.destination, mapOf( - "destination" to streamEvent.destination, - "binder" to binder - )) + logger.info( + "Found StreamEvent class: ${clazz.name}, " + + "with destination[${streamEvent.destination}]" ) + setProperty("spring.cloud.stream.bindings.${clazz.simpleName}.destination", streamEvent.destination) + setProperty("spring.cloud.stream.bindings.${clazz.simpleName}.destination", binder) } - ref.getTypesAnnotatedWith(StreamConsumer::class.java).forEach { clazz -> - val streamConsumer = clazz.getAnnotation(StreamConsumer::class.java) - println("Found StreamConsumer class: ${clazz.name}, " + - "with destination[${streamConsumer.destination}] group[${streamConsumer.group}]") + val consumerBeans = Reflections( + ConfigurationBuilder() + .addUrls(ClasspathHelper.forPackage("com.tencent.devops")) + .setExpandSuperTypes(true) + .setScanners(Scanners.MethodsAnnotated) + ).getTypesAnnotatedWith(StreamConsumer::class.java) + consumerBeans.forEach { method -> + val streamConsumer = method.getAnnotation(StreamConsumer::class.java) + println( + "Found StreamConsumer class: ${method.name}, " + + "with destination[${streamConsumer.streamEvent}] group[${streamConsumer.group}]" + ) // 如果注解中指定了订阅组,则直接设置 // 如果未指定则取当前服务名作为订阅组,保证所有分布式服务再同一个组内 val subscriptionGroup = streamConsumer.group.ifBlank { - serviceName?.let { "$it-service" } ?: "default" + serviceName ?: "default" } - propertySources.addLast( - MapPropertySource(clazz.simpleName, mapOf( - "destination" to streamConsumer.destination, - "group" to subscriptionGroup, - "binder" to binder - )) + setProperty( + "spring.cloud.stream.bindings.${method.name}-in-0", + streamConsumer.streamEvent.destination + ) + setProperty( + "spring.cloud.stream.bindings.bindings.${method.name}-in-0", + subscriptionGroup ) } - setProperty("spring.cloud.consul.config.name", "") - setProperty("spring.cloud.consul.config.name", "") return PropertiesPropertySource(STREAM_SOURCE_NAME, this) } } override fun getOrder(): Int { - return ConfigDataEnvironmentPostProcessor.ORDER - 1 + return ConfigDataEnvironmentPostProcessor.ORDER - 1 } companion object { diff --git a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/IEvent.kt b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/IEvent.kt index f4fca67ddfe..354723c8832 100644 --- a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/IEvent.kt +++ b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/IEvent.kt @@ -49,7 +49,7 @@ open class IEvent( fun sendTo(bridge: StreamBridge) { try { val eventType = this::class.java.annotations.find { s -> s is StreamEvent } as StreamEvent - bridge.send(eventType.outBinding, buildMessage(eventType.delayMills)) + bridge.send(eventType.destination, buildMessage(eventType.delayMills)) } catch (ignored: Exception) { logger.error("[STREAM MQ] Fail to dispatch the event($this)", ignored) } diff --git a/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/SampleEvent.kt b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/SampleEvent.kt new file mode 100644 index 00000000000..3bf94872d44 --- /dev/null +++ b/src/backend/ci/core/common/common-stream/src/main/kotlin/com/tencent/devops/common/stream/pojo/SampleEvent.kt @@ -0,0 +1,36 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.devops.common.stream.pojo + +import com.tencent.devops.common.stream.annotation.StreamEvent +@StreamEvent("a.b.c") +data class SampleEvent( + val buildId: String, + var retryTime: Int = 2, + var delayMills: Int = 0 +) diff --git a/src/backend/ci/core/common/common-stream/src/test/kotlin/com/tencent/devops/common/stream/pulsar/util/PulsarTopicUtilsTest.kt b/src/backend/ci/core/common/common-stream/src/test/kotlin/com/tencent/devops/common/stream/pulsar/util/PulsarTopicUtilsTest.kt index b48688cbc2e..8e37354db6c 100644 --- a/src/backend/ci/core/common/common-stream/src/test/kotlin/com/tencent/devops/common/stream/pulsar/util/PulsarTopicUtilsTest.kt +++ b/src/backend/ci/core/common/common-stream/src/test/kotlin/com/tencent/devops/common/stream/pulsar/util/PulsarTopicUtilsTest.kt @@ -33,12 +33,19 @@ internal class PulsarTopicUtilsTest { @Test fun testReflections() { - val config = ConfigurationBuilder() - config.addUrls(ClasspathHelper.forPackage("com.tencent.devops")) - config.setExpandSuperTypes(true) - config.setScanners(Scanners.Resources) - val reflections = Reflections(config) + val reflections = Reflections( + ConfigurationBuilder() + .addUrls(ClasspathHelper.forPackage("com.tencent.devops")) + .setExpandSuperTypes(true) + ) val re = reflections.getTypesAnnotatedWith(StreamEvent::class.java) println(re) + re.forEach { clazz -> + val streamEvent = clazz.getAnnotation(StreamEvent::class.java) + println( + "Found StreamEvent class: ${clazz.canonicalName}, " + + "with destination[${streamEvent.destination}]" + ) + } } } diff --git a/src/backend/ci/core/log/biz-log/src/main/kotlin/com/tencent/devops/log/configuration/LogMQConfiguration.kt b/src/backend/ci/core/log/biz-log/src/main/kotlin/com/tencent/devops/log/configuration/LogMQConfiguration.kt index 6eba034721c..a2b57356eaa 100644 --- a/src/backend/ci/core/log/biz-log/src/main/kotlin/com/tencent/devops/log/configuration/LogMQConfiguration.kt +++ b/src/backend/ci/core/log/biz-log/src/main/kotlin/com/tencent/devops/log/configuration/LogMQConfiguration.kt @@ -27,6 +27,7 @@ package com.tencent.devops.log.configuration +import com.tencent.devops.common.stream.annotation.StreamConsumer import com.tencent.devops.common.stream.constants.StreamBinding import com.tencent.devops.log.event.LogOriginEvent import com.tencent.devops.log.event.LogStatusEvent @@ -57,7 +58,7 @@ class LogMQConfiguration @Autowired constructor() { logServiceConfig: LogServiceConfig ) = BuildLogPrintService(streamBridge, logPrintBean, storageProperties, logServiceConfig) - @Bean(StreamBinding.BINDING_LOG_ORIGIN_EVENT_IN) + @StreamConsumer(StreamBinding.BINDING_LOG_ORIGIN_EVENT_IN) fun logOriginEventIn( listenerService: BuildLogListenerService ): Consumer> { @@ -66,7 +67,7 @@ class LogMQConfiguration @Autowired constructor() { } } - @Bean(StreamBinding.BINDING_LOG_STORAGE_EVENT_IN) + @StreamConsumer(StreamBinding.BINDING_LOG_STORAGE_EVENT_IN) fun logStorageEventIn( listenerService: BuildLogListenerService ): Consumer> { @@ -75,7 +76,7 @@ class LogMQConfiguration @Autowired constructor() { } } - @Bean(StreamBinding.BINDING_LOG_STATUS_EVENT_IN) + @StreamConsumer(StreamBinding.BINDING_LOG_STATUS_EVENT_IN) fun logStatusEventIn( listenerService: BuildLogListenerService ): Consumer> { diff --git a/src/backend/ci/core/log/biz-log/src/test/kotlin/com/tencent/devops/log/util/IndexNameUtilsTest.kt b/src/backend/ci/core/log/biz-log/src/test/kotlin/com/tencent/devops/log/util/IndexNameUtilsTest.kt index 4d1c96acc92..135c41715af 100644 --- a/src/backend/ci/core/log/biz-log/src/test/kotlin/com/tencent/devops/log/util/IndexNameUtilsTest.kt +++ b/src/backend/ci/core/log/biz-log/src/test/kotlin/com/tencent/devops/log/util/IndexNameUtilsTest.kt @@ -1,9 +1,15 @@ package com.tencent.devops.log.util import com.github.benmanes.caffeine.cache.Caffeine -import java.util.concurrent.TimeUnit +import com.tencent.devops.common.stream.annotation.StreamConsumer +import com.tencent.devops.common.stream.annotation.StreamEvent import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import org.reflections.Reflections +import org.reflections.scanners.Scanners +import org.reflections.util.ClasspathHelper +import org.reflections.util.ConfigurationBuilder +import java.util.concurrent.TimeUnit class IndexNameUtilsTest { @@ -32,4 +38,35 @@ class IndexNameUtilsTest { } println(indexCache.stats()) } + + @Test + fun testReflections() { + val re = Reflections( + ConfigurationBuilder() + .addUrls(ClasspathHelper.forPackage("com.tencent.devops")) + .setExpandSuperTypes(true) + ).getTypesAnnotatedWith(StreamEvent::class.java) + println(re) + re.forEach { clazz -> + val streamEvent = clazz.getAnnotation(StreamEvent::class.java) + println( + "Found StreamEvent class: ${clazz.canonicalName}, " + + "with destination[${streamEvent.destination}]" + ) + } + val re1 = Reflections( + ConfigurationBuilder() + .addUrls(ClasspathHelper.forPackage("com.tencent.devops")) + .setExpandSuperTypes(true) + .setScanners(Scanners.MethodsAnnotated) + ).getMethodsAnnotatedWith(StreamConsumer::class.java) + println(re1) + re1.forEach { method -> + val streamEvent = method.getAnnotation(StreamConsumer::class.java) + println( + "Found StreamEvent class: ${method.name}, " + + "with destination[${streamEvent.streamEvent}]" + ) + } + } }