Skip to content

Commit

Permalink
feat: 引擎等MQ场景接入SCS框架 TencentBlueKing#7443
Browse files Browse the repository at this point in the history
  • Loading branch information
royalhuang committed Sep 22, 2022
1 parent be71db1 commit ad9d441
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.reflections.scanners.Scanners
import org.reflections.util.ClasspathHelper
import org.reflections.util.ConfigurationBuilder
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.SpringApplication
import org.springframework.boot.context.config.ConfigDataEnvironmentPostProcessor
import org.springframework.boot.env.EnvironmentPostProcessor
Expand All @@ -43,18 +42,8 @@ import org.springframework.core.env.ConfigurableEnvironment
import org.springframework.core.env.PropertiesPropertySource
import java.util.Properties

// TODO #7443
class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered {

@Value("\${spring.cloud.stream.default-binder:#{null}}")
private val defaultBinder: String? = null

@Value("\${spring.cloud.stream.service-binder:#{null}}")
private val serviceBinder: String? = null

@Value("\${spring.application.name:#{null}}")
private val serviceName: String? = null

override fun postProcessEnvironment(environment: ConfigurableEnvironment, application: SpringApplication) {
environment.propertySources.addLast(createPropertySource())
}
Expand All @@ -63,7 +52,7 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered
with(Properties()) {
// 如果未配置服务使用的binder类型,则使用全局默认binder类型
// 如果均未配置则不进行注解的反射解析
val binder = serviceBinder ?: defaultBinder ?: return PropertiesPropertySource(STREAM_SOURCE_NAME, this)
val binder = "\${spring.cloud.stream.service-binder}"
val definition = mutableListOf<String>()
val eventClasses = Reflections(
ConfigurationBuilder()
Expand All @@ -77,8 +66,9 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered
"with destination[${streamEvent.destination}]"
)
definition.add(clazz.simpleName)
setProperty("spring.cloud.stream.bindings.${clazz.simpleName}.destination", streamEvent.destination)
setProperty("spring.cloud.stream.bindings.${clazz.simpleName}.destination", binder)
val prefix = "spring.cloud.stream.bindings.${clazz.simpleName}"
setProperty("$prefix.destination", streamEvent.destination)
setProperty("$prefix.binder", binder)
}
val consumerBeans = Reflections(
ConfigurationBuilder()
Expand All @@ -95,25 +85,19 @@ class StreamBindingEnvironmentPostProcessor : EnvironmentPostProcessor, Ordered
definition.add(method.name)
// 如果注解中指定了订阅组,则直接设置
// 如果未指定则取当前服务名作为订阅组,保证所有分布式服务再同一个组内
val subscriptionGroup = streamConsumer.group.ifBlank {
serviceName ?: "default"
}
setProperty(
"spring.cloud.stream.bindings.${method.name}-in-0",
streamConsumer.destination
)
setProperty(
"spring.cloud.stream.bindings.bindings.${method.name}-in-0",
subscriptionGroup
)
val subscriptionGroup = "\${spring.application.name}"
val prefix = "spring.cloud.stream.bindings.${method.name}-in-0"
setProperty("$prefix.destination", streamConsumer.destination)
setProperty("$prefix.group", subscriptionGroup)
setProperty("$prefix.binder", binder)
}
setProperty("spring.cloud.stream.function.definition", definition.joinToString(";"))
return PropertiesPropertySource(STREAM_SOURCE_NAME, this)
}
}

override fun getOrder(): Int {
return ConfigDataEnvironmentPostProcessor.ORDER - 1
return ConfigDataEnvironmentPostProcessor.ORDER + 1
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ open class IEvent(
fun sendTo(bridge: StreamBridge) {
try {
val eventType = this::class.java.annotations.find { s -> s is StreamEvent } as StreamEvent
// TODO #7443
bridge.send(eventType.destination, buildMessage(eventType.delayMills))
} catch (ignored: Exception) {
logger.error("[STREAM MQ] Fail to dispatch the event($this)", ignored)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

package com.tencent.devops.common.stream.pulsar.config

import com.tencent.devops.common.stream.config.StreamBindingEnvironmentPostProcessor
import com.tencent.devops.common.stream.pulsar.convert.PulsarMessageConverter
import com.tencent.devops.common.stream.pulsar.custom.PulsarConfigBeanPostProcessor
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
Expand All @@ -53,6 +54,11 @@ class ExtendedBindingHandlerMappingsProviderConfiguration {
return PulsarConfigBeanPostProcessor()
}

@Bean
fun streamBindingEnvironmentPostProcessor(): StreamBindingEnvironmentPostProcessor {
return StreamBindingEnvironmentPostProcessor()
}

/**
* if you want to customize a bean, please use this BeanName `PulsarMessageConverter.DEFAULT_NAME`.
*/
Expand Down

0 comments on commit ad9d441

Please sign in to comment.