Skip to content

Commit

Permalink
feat: 引擎等MQ场景接入SCS框架 TencentBlueKing#7443
Browse files Browse the repository at this point in the history
  • Loading branch information
royalhuang committed Sep 14, 2022
1 parent e0793ca commit 13022d7
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ import org.springframework.context.annotation.Bean

/**
* Stream消费者注解
* @param destination 发送目标绑定,用于指定RabbitMQ的Exchange,Kafka和Pulsar的Topic
* @param streamEvent 目标绑定接受的事件
* @param group 指定订阅组,如果是广播事件则需要指定,否则为非广播默认订阅组
*/
@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
@Bean
annotation class StreamConsumer(
val destination: String,
val streamEvent: StreamEvent,
val group: String = ""
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ package com.tencent.devops.common.stream.annotation

/**
* Stream事件注解
* @param outBinding 发送目标绑定,用于指定RabbitMQ的Exchange,Kafka和Pulsar的Topic
* @param destination 发送目标绑定,用于指定RabbitMQ的Exchange,Kafka和Pulsar的Topic
* @param fanout 是否为广播事件
* @param delayMills 延迟时间,如果为延迟事件则必填
*/
@Target(AnnotationTarget.FILE)
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.RUNTIME)
annotation class StreamEvent(
val outBinding: String,
val fanout: Boolean = false,
val destination: String,
val delayMills: Int = 0
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,22 @@ package com.tencent.devops.common.stream.config

import com.tencent.devops.common.stream.annotation.StreamConsumer
import com.tencent.devops.common.stream.annotation.StreamEvent
import org.reflections.Reflections
import org.reflections.scanners.Scanners
import org.reflections.util.ClasspathHelper
import org.reflections.util.ConfigurationBuilder
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.SpringApplication
import org.springframework.boot.context.config.ConfigDataEnvironmentPostProcessor
import org.springframework.boot.env.EnvironmentPostProcessor
import org.springframework.core.Ordered
import org.springframework.core.env.ConfigurableEnvironment
import org.springframework.core.env.MapPropertySource
import org.springframework.core.env.PropertiesPropertySource
import java.util.Properties

// TODO #7443
class StreamBindingEnvironmentPostProcessor: EnvironmentPostProcessor, Ordered {
class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered {

@Value("\${spring.cloud.stream.default-binder:#{null}}")
private val defaultBinder: String? = null
Expand All @@ -60,45 +63,53 @@ class StreamBindingEnvironmentPostProcessor: EnvironmentPostProcessor, Ordered {
with(Properties()) {
// 如果未配置服务使用的binder类型,则使用全局默认binder类型
// 如果均未配置则不进行注解的反射解析
val binder = serviceBinder ?: defaultBinder ?: return
val ref = Reflections()
val propertySources = applicationContext.environment.propertySources
ref.getTypesAnnotatedWith(StreamEvent::class.java).forEach { clazz ->
val binder = serviceBinder ?: defaultBinder ?: return PropertiesPropertySource(STREAM_SOURCE_NAME, this)
val eventClasses = Reflections(
ConfigurationBuilder()
.addUrls(ClasspathHelper.forPackage("com.tencent.devops"))
.setExpandSuperTypes(true)
).getTypesAnnotatedWith(StreamEvent::class.java)
eventClasses.forEach { clazz ->
val streamEvent = clazz.getAnnotation(StreamEvent::class.java)
logger.info("Found StreamEvent class: ${clazz.name}, " +
"with destination[${streamEvent.destination}]")
propertySources.addLast(
MapPropertySource(streamEvent.destination, mapOf(
"destination" to streamEvent.destination,
"binder" to binder
))
logger.info(
"Found StreamEvent class: ${clazz.name}, " +
"with destination[${streamEvent.destination}]"
)
setProperty("spring.cloud.stream.bindings.${clazz.simpleName}.destination", streamEvent.destination)
setProperty("spring.cloud.stream.bindings.${clazz.simpleName}.destination", binder)
}
ref.getTypesAnnotatedWith(StreamConsumer::class.java).forEach { clazz ->
val streamConsumer = clazz.getAnnotation(StreamConsumer::class.java)
println("Found StreamConsumer class: ${clazz.name}, " +
"with destination[${streamConsumer.destination}] group[${streamConsumer.group}]")
val consumerBeans = Reflections(
ConfigurationBuilder()
.addUrls(ClasspathHelper.forPackage("com.tencent.devops"))
.setExpandSuperTypes(true)
.setScanners(Scanners.MethodsAnnotated)
).getTypesAnnotatedWith(StreamConsumer::class.java)
consumerBeans.forEach { method ->
val streamConsumer = method.getAnnotation(StreamConsumer::class.java)
println(
"Found StreamConsumer class: ${method.name}, " +
"with destination[${streamConsumer.streamEvent}] group[${streamConsumer.group}]"
)
// 如果注解中指定了订阅组,则直接设置
// 如果未指定则取当前服务名作为订阅组,保证所有分布式服务再同一个组内
val subscriptionGroup = streamConsumer.group.ifBlank {
serviceName?.let { "$it-service" } ?: "default"
serviceName ?: "default"
}
propertySources.addLast(
MapPropertySource(clazz.simpleName, mapOf(
"destination" to streamConsumer.destination,
"group" to subscriptionGroup,
"binder" to binder
))
setProperty(
"spring.cloud.stream.bindings.${method.name}-in-0",
streamConsumer.streamEvent.destination
)
setProperty(
"spring.cloud.stream.bindings.bindings.${method.name}-in-0",
subscriptionGroup
)
}
setProperty("spring.cloud.consul.config.name", "")
setProperty("spring.cloud.consul.config.name", "")
return PropertiesPropertySource(STREAM_SOURCE_NAME, this)
}
}

override fun getOrder(): Int {
return ConfigDataEnvironmentPostProcessor.ORDER - 1
return ConfigDataEnvironmentPostProcessor.ORDER - 1
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ open class IEvent(
fun sendTo(bridge: StreamBridge) {
try {
val eventType = this::class.java.annotations.find { s -> s is StreamEvent } as StreamEvent
bridge.send(eventType.outBinding, buildMessage(eventType.delayMills))
bridge.send(eventType.destination, buildMessage(eventType.delayMills))
} catch (ignored: Exception) {
logger.error("[STREAM MQ] Fail to dispatch the event($this)", ignored)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.devops.common.stream.pojo

import com.tencent.devops.common.stream.annotation.StreamEvent
@StreamEvent("a.b.c")
data class SampleEvent(
val buildId: String,
var retryTime: Int = 2,
var delayMills: Int = 0
)
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,19 @@ internal class PulsarTopicUtilsTest {

@Test
fun testReflections() {
val config = ConfigurationBuilder()
config.addUrls(ClasspathHelper.forPackage("com.tencent.devops"))
config.setExpandSuperTypes(true)
config.setScanners(Scanners.Resources)
val reflections = Reflections(config)
val reflections = Reflections(
ConfigurationBuilder()
.addUrls(ClasspathHelper.forPackage("com.tencent.devops"))
.setExpandSuperTypes(true)
)
val re = reflections.getTypesAnnotatedWith(StreamEvent::class.java)
println(re)
re.forEach { clazz ->
val streamEvent = clazz.getAnnotation(StreamEvent::class.java)
println(
"Found StreamEvent class: ${clazz.canonicalName}, " +
"with destination[${streamEvent.destination}]"
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

package com.tencent.devops.log.configuration

import com.tencent.devops.common.stream.annotation.StreamConsumer
import com.tencent.devops.common.stream.constants.StreamBinding
import com.tencent.devops.log.event.LogOriginEvent
import com.tencent.devops.log.event.LogStatusEvent
Expand Down Expand Up @@ -57,7 +58,7 @@ class LogMQConfiguration @Autowired constructor() {
logServiceConfig: LogServiceConfig
) = BuildLogPrintService(streamBridge, logPrintBean, storageProperties, logServiceConfig)

@Bean(StreamBinding.BINDING_LOG_ORIGIN_EVENT_IN)
@StreamConsumer(StreamBinding.BINDING_LOG_ORIGIN_EVENT_IN)
fun logOriginEventIn(
listenerService: BuildLogListenerService
): Consumer<Message<LogOriginEvent>> {
Expand All @@ -66,7 +67,7 @@ class LogMQConfiguration @Autowired constructor() {
}
}

@Bean(StreamBinding.BINDING_LOG_STORAGE_EVENT_IN)
@StreamConsumer(StreamBinding.BINDING_LOG_STORAGE_EVENT_IN)
fun logStorageEventIn(
listenerService: BuildLogListenerService
): Consumer<Message<LogStorageEvent>> {
Expand All @@ -75,7 +76,7 @@ class LogMQConfiguration @Autowired constructor() {
}
}

@Bean(StreamBinding.BINDING_LOG_STATUS_EVENT_IN)
@StreamConsumer(StreamBinding.BINDING_LOG_STATUS_EVENT_IN)
fun logStatusEventIn(
listenerService: BuildLogListenerService
): Consumer<Message<LogStatusEvent>> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package com.tencent.devops.log.util

import com.github.benmanes.caffeine.cache.Caffeine
import java.util.concurrent.TimeUnit
import com.tencent.devops.common.stream.annotation.StreamConsumer
import com.tencent.devops.common.stream.annotation.StreamEvent
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.reflections.Reflections
import org.reflections.scanners.Scanners
import org.reflections.util.ClasspathHelper
import org.reflections.util.ConfigurationBuilder
import java.util.concurrent.TimeUnit

class IndexNameUtilsTest {

Expand Down Expand Up @@ -32,4 +38,35 @@ class IndexNameUtilsTest {
}
println(indexCache.stats())
}

@Test
fun testReflections() {
val re = Reflections(
ConfigurationBuilder()
.addUrls(ClasspathHelper.forPackage("com.tencent.devops"))
.setExpandSuperTypes(true)
).getTypesAnnotatedWith(StreamEvent::class.java)
println(re)
re.forEach { clazz ->
val streamEvent = clazz.getAnnotation(StreamEvent::class.java)
println(
"Found StreamEvent class: ${clazz.canonicalName}, " +
"with destination[${streamEvent.destination}]"
)
}
val re1 = Reflections(
ConfigurationBuilder()
.addUrls(ClasspathHelper.forPackage("com.tencent.devops"))
.setExpandSuperTypes(true)
.setScanners(Scanners.MethodsAnnotated)
).getMethodsAnnotatedWith(StreamConsumer::class.java)
println(re1)
re1.forEach { method ->
val streamEvent = method.getAnnotation(StreamConsumer::class.java)
println(
"Found StreamEvent class: ${method.name}, " +
"with destination[${streamEvent.streamEvent}]"
)
}
}
}

0 comments on commit 13022d7

Please sign in to comment.