Skip to content

Commit

Permalink
feat: 引擎等MQ场景接入SCS框架 #7443 接入websocket和auth服务
Browse files Browse the repository at this point in the history
  • Loading branch information
royalhuang committed Oct 14, 2022
1 parent e5a6a6b commit 751d9a4
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,15 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered
// 反射扫描所有带有 StreamConsumer 注解的bean方法
consumerMethodBeans.forEach { method ->
val consumer = method.getAnnotation(StreamEventConsumer::class.java)
val bindingName = DefaultBindingUtils.getInBindingName(
method = method,
suffix = if (consumer.separately) hostName else null
)
val bindingName = DefaultBindingUtils.getInBindingName(method)
logger.info(
"Found StreamConsumer method: ${method.name}, bindingName[$bindingName], " +
"with destination[${consumer.destination}], group[${consumer.group}]"
)
definition.add(bindingName)
// 如果注解中指定了订阅组,则直接设置
// 如果未指定则取当前服务名作为订阅组,保证所有分布式服务再同一个组内
setBindings(bindingName, consumer)
setBindings(bindingName, consumer, hostName)
}

// 反射扫描所有带有 StreamConsumer 注解的bean类型
Expand All @@ -117,10 +114,7 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered
).getTypesAnnotatedWith(StreamEventConsumer::class.java)
consumerClassBeans.forEach { clazz ->
val consumer = clazz.getAnnotation(StreamEventConsumer::class.java)
val bindingName = DefaultBindingUtils.getInBindingName(
clazz = clazz,
suffix = if (consumer.separately) hostName else null
)
val bindingName = DefaultBindingUtils.getInBindingName(clazz)
logger.info(
"Found StreamConsumer class: ${clazz.name}, bindingName[$bindingName], " +
"with destination[${consumer.destination}], " +
Expand All @@ -129,7 +123,7 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered
definition.add(bindingName)
// 如果注解中指定了订阅组,则直接设置
// 如果未指定则取当前服务名作为订阅组,保证所有分布式服务再同一个组内
setBindings(bindingName, consumer)
setBindings(bindingName, consumer, hostName)
}

// 声明所有扫描结果的函数式声明
Expand All @@ -138,15 +132,19 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered
}
}

private fun Properties.setBindings(bindingName: String, streamEventConsumer: StreamEventConsumer) {
private fun Properties.setBindings(
bindingName: String,
consumer: StreamEventConsumer,
hostName: String
) {
val bindingPrefix = "spring.cloud.stream.bindings.$bindingName-in-0"
val rabbitPropPrefix = "spring.cloud.stream.rabbit.bindings.$bindingName-in-0"
setProperty("$bindingPrefix.destination", streamEventConsumer.destination)
setProperty("$bindingPrefix.group", streamEventConsumer.group)
setProperty("$bindingPrefix.destination", consumer.destination)
setProperty("$bindingPrefix.group", if (consumer.separately) hostName else consumer.group)
setProperty("$rabbitPropPrefix.consumer.delayedExchange", "true")
setProperty(
"$rabbitPropPrefix.consumer.consumerTagPrefix",
"\${spring.application.name}-${streamEventConsumer.group}-\${spring.cloud.stream.instance-index}"
"\${spring.application.name}-${consumer.group}-\${spring.cloud.stream.instance-index}"
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,12 @@
package com.tencent.devops.common.stream.utils

import com.tencent.devops.common.api.util.UUIDUtil
import java.lang.reflect.Method

object DefaultBindingUtils {

fun getOutBindingName(clazz: Class<*>) = clazz.simpleName.decapitalize()

fun getInBindingName(method: Method, suffix: String?): String {
return if (suffix.isNullOrBlank()) {
method.name
} else {
"${method.name}-$suffix"
}
}
fun getInBindingName(method: Method): String = method.name

fun getInBindingName(clazz: Class<*>, suffix: String?): String {
return if (suffix.isNullOrBlank()) {
clazz.simpleName.decapitalize()
} else {
"${clazz.simpleName.decapitalize()}-$suffix"
}
}
fun getInBindingName(clazz: Class<*>) = clazz.simpleName.decapitalize()
}

0 comments on commit 751d9a4

Please sign in to comment.