Skip to content

Commit

Permalink
feat: 引擎等MQ场景接入SCS框架 TencentBlueKing#7443
Browse files Browse the repository at this point in the history
  • Loading branch information
royalhuang committed Sep 23, 2022
1 parent 795ff77 commit c691757
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package com.tencent.devops.common.stream.config

import com.tencent.devops.common.stream.annotation.StreamConsumer
import com.tencent.devops.common.stream.annotation.StreamEvent
import com.tencent.devops.common.stream.utils.DefaultBindingUtils
import org.reflections.Reflections
import org.reflections.scanners.Scanners
import org.reflections.util.ClasspathHelper
Expand All @@ -53,14 +54,16 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered
// 如果未配置服务使用的binder类型,则使用全局默认binder类型
// 如果均未配置则不进行注解的反射解析
val definition = mutableListOf<String>()

// 反射扫描所有带有 StreamEvent 注解的事件类
val eventClasses = Reflections(
ConfigurationBuilder()
.addUrls(ClasspathHelper.forPackage("com.tencent.devops"))
.setExpandSuperTypes(true)
).getTypesAnnotatedWith(StreamEvent::class.java)
eventClasses.forEach { clazz ->
val streamEvent = clazz.getAnnotation(StreamEvent::class.java)
val bindingName = "${clazz.simpleName.decapitalize()}Out"
val bindingName = DefaultBindingUtils.getOutBindingName(clazz)
logger.info(
"Found StreamEvent class: ${clazz.name}, bindingName[$bindingName], " +
"with destination[${streamEvent.destination}, delayMills[${streamEvent.delayMills}]"
Expand All @@ -69,26 +72,52 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered
val prefix = "spring.cloud.stream.bindings.$bindingName"
setProperty("$prefix.destination", streamEvent.destination)
}
val consumerBeans = Reflections(
val consumerMethodBeans = Reflections(
ConfigurationBuilder()
.addUrls(ClasspathHelper.forPackage("com.tencent.devops"))
.setExpandSuperTypes(true)
.setScanners(Scanners.MethodsAnnotated)
).getMethodsAnnotatedWith(StreamConsumer::class.java)
consumerBeans.forEach { method ->

// 反射扫描所有带有 StreamConsumer 注解的bean方法
consumerMethodBeans.forEach { method ->
val streamConsumer = method.getAnnotation(StreamConsumer::class.java)
val bindingName = "${method.name}In"
val bindingName = DefaultBindingUtils.getInBindingName(method)
logger.info(
"Found StreamConsumer method: ${method.name}, bindingName[$bindingName], " +
"with destination[${streamConsumer.destination}], group[${streamConsumer.group}]"
)
definition.add(bindingName)
// 如果注解中指定了订阅组,则直接设置
// 如果未指定则取当前服务名作为订阅组,保证所有分布式服务再同一个组内
val prefix = "spring.cloud.stream.bindings.${method.name}In-in-0"
val prefix = "spring.cloud.stream.bindings.$bindingName-in-0"
setProperty("$prefix.destination", streamConsumer.destination)
setProperty("$prefix.group", streamConsumer.group)
}

// 反射扫描所有带有 StreamConsumer 注解的bean类型
val consumerClassBeans = Reflections(
ConfigurationBuilder()
.addUrls(ClasspathHelper.forPackage("com.tencent.devops"))
.setExpandSuperTypes(true)
.setScanners(Scanners.MethodsAnnotated)
).getTypesAnnotatedWith(StreamConsumer::class.java)
consumerClassBeans.forEach { clazz ->
val streamConsumer = clazz.getAnnotation(StreamConsumer::class.java)
val bindingName = DefaultBindingUtils.getInBindingName(clazz)
logger.info(
"Found StreamConsumer class: ${clazz.name}, bindingName[$bindingName], " +
"with destination[${streamConsumer.destination}], group[${streamConsumer.group}]"
)
definition.add(bindingName)
// 如果注解中指定了订阅组,则直接设置
// 如果未指定则取当前服务名作为订阅组,保证所有分布式服务再同一个组内
val prefix = "spring.cloud.stream.bindings.$bindingName-in-0"
setProperty("$prefix.destination", streamConsumer.destination)
setProperty("$prefix.group", streamConsumer.group)
}

// 声明所有扫描结果的函数式声明
setProperty("spring.cloud.stream.function.definition", definition.joinToString(";"))
return PropertiesPropertySource(STREAM_SOURCE_NAME, this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package com.tencent.devops.common.stream.pojo

import com.tencent.devops.common.stream.annotation.StreamEvent
import com.tencent.devops.common.stream.constants.StreamHeader.X_DELAY
import com.tencent.devops.common.stream.utils.DefaultBindingUtils
import org.slf4j.LoggerFactory
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.messaging.Message
Expand All @@ -49,7 +50,7 @@ open class IEvent(
fun sendTo(bridge: StreamBridge) {
try {
val eventType = this::class.java.annotations.find { s -> s is StreamEvent } as StreamEvent
bridge.send("${this::class.java.simpleName.decapitalize()}Out", buildMessage(eventType.delayMills))
bridge.send(DefaultBindingUtils.getOutBindingName(this::class.java), buildMessage(eventType.delayMills))
} catch (ignored: Exception) {
logger.error("[STREAM MQ] Fail to dispatch the event($this)", ignored)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.tencent.devops.common.stream.utils

import java.lang.reflect.Method

object DefaultBindingUtils {

fun getOutBindingName(clazz: Class<*>) = "${clazz.simpleName.decapitalize()}Out"

fun getInBindingName(method: Method) = "${method.name}In"

fun getInBindingName(clazz: Class<*>) = "${clazz.simpleName.decapitalize()}In"
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.tencent.devops.common.stream.annotation.StreamEvent
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.reflections.Reflections
import org.reflections.scanners.Scanners
import org.reflections.util.ClasspathHelper
import org.reflections.util.ConfigurationBuilder

Expand Down

0 comments on commit c691757

Please sign in to comment.