Skip to content

Commit

Permalink
Collect error spans from providers (emeraldpay#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Apr 11, 2023
1 parent fc488e1 commit 6b0e201
Show file tree
Hide file tree
Showing 12 changed files with 540 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Defaults {

companion object {
const val maxMessageSize: Int = 32 * 1024 * 1024
const val maxMetadataSize = 16384
val timeout: Duration = Duration.ofSeconds(60)
val timeoutInternal: Duration = timeout.dividedBy(4)
val retryConnection: Duration = Duration.ofSeconds(10)
Expand Down
15 changes: 14 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/GrpcServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import io.micrometer.core.instrument.Metrics
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.scheduling.concurrent.CustomizableThreadFactory
import org.springframework.stereotype.Service
import java.net.InetSocketAddress
Expand All @@ -42,8 +45,13 @@ open class GrpcServer(
private val mainConfig: MainConfig,
private val tlsSetup: TlsSetup,
private val accessHandler: AccessHandlerGrpc,
private val grpcServerBraveInterceptor: ServerInterceptor
private val grpcServerBraveInterceptor: ServerInterceptor,
@Autowired(required = false)
@Qualifier("serverSpansInterceptor")
private val serverSpansInterceptor: ServerInterceptor?
) {
@Value("\${spring.application.max-metadata-size}")
private var maxMetadataSize: Int = Defaults.maxMetadataSize

private val log = LoggerFactory.getLogger(GrpcServer::class.java)

Expand All @@ -68,6 +76,7 @@ open class GrpcServer(
val serverBuilder = NettyServerBuilder
.forAddress(InetSocketAddress(mainConfig.host, mainConfig.port))
.maxInboundMessageSize(Defaults.maxMessageSize)
.maxInboundMetadataSize(maxMetadataSize)

if (mainConfig.accessLogConfig.enabled) {
serverBuilder.intercept(accessHandler)
Expand All @@ -78,6 +87,10 @@ open class GrpcServer(
}

serverBuilder.intercept(grpcServerBraveInterceptor)
serverSpansInterceptor?.let {
serverBuilder.intercept(it)
log.info("Collect spans from provider is enabled")
}

tlsSetup.setupServer("Native gRPC", mainConfig.tls, true)?.let {
serverBuilder.sslContext(it)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.emeraldpay.dshackle.config.spans

import brave.Tracer
import brave.handler.MutableSpan
import brave.handler.SpanHandler
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import io.grpc.CallOptions
import io.grpc.Channel
import io.grpc.ClientCall
import io.grpc.ClientInterceptor
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall
import io.grpc.ForwardingClientCallListener
import io.grpc.Metadata
import io.grpc.Metadata.ASCII_STRING_MARSHALLER
import io.grpc.MethodDescriptor
import org.springframework.beans.factory.annotation.Qualifier

class ClientSpansInterceptor(
private val zipkinSpanHandler: SpanHandler,
private val tracer: Tracer,
@Qualifier("spanMapper")
private val spanMapper: ObjectMapper
) : ClientInterceptor {

override fun <ReqT : Any?, RespT : Any?> interceptCall(
method: MethodDescriptor<ReqT, RespT>,
callOptions: CallOptions,
next: Channel
): ClientCall<ReqT, RespT> {
return if (method.fullMethodName == "emerald.Blockchain/NativeCall") {
SpanClientCall(next.newCall(method, callOptions))
} else {
next.newCall(method, callOptions)
}
}

private inner class SpanClientCall<ReqT, RespT>(
delegate: ClientCall<ReqT, RespT>?
) : SimpleForwardingClientCall<ReqT, RespT>(delegate) {
override fun start(responseListener: Listener<RespT>, headers: Metadata) {
val spanResponseListener = SpanClientCallListener(responseListener)
super.start(spanResponseListener, headers)
}
}

private inner class SpanClientCallListener<RespT>(
delegate: ClientCall.Listener<RespT>,
) : ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(delegate) {
override fun onHeaders(headers: Metadata) {
headers[Metadata.Key.of(SPAN_HEADER, ASCII_STRING_MARSHALLER)]
?.takeIf { it.isNotBlank() }
?.let {
val spansFromProvider = spanMapper.readValue<List<MutableSpan>>(it)
spansFromProvider.forEach { span ->
zipkinSpanHandler.end(tracer.currentSpan().context(), span, SpanHandler.Cause.FINISHED)
}
}
super.onHeaders(headers)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.emeraldpay.dshackle.config.spans

import brave.handler.MutableSpan
import brave.handler.SpanHandler
import brave.propagation.TraceContext
import com.fasterxml.jackson.databind.ObjectMapper
import com.github.benmanes.caffeine.cache.Caffeine
import io.emeraldpay.dshackle.commons.SPAN_ERROR
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.cloud.sleuth.Span
import java.time.Duration

class ErrorSpanHandler(
@Qualifier("spanMapper")
private val spanMapper: ObjectMapper,
) : SpanHandler() {
private val spans = Caffeine
.newBuilder()
.expireAfterWrite(Duration.ofMinutes(5))
.build<String, MutableList<MutableSpan>>()

override fun end(context: TraceContext, span: MutableSpan, cause: Cause): Boolean {
if (span.traceId().length > 20 && span.parentId() != null) {
val spanList = spans.asMap().computeIfAbsent(span.parentId()) { mutableListOf() }
spanList.add(span)
}
return super.end(context, span, cause)
}

fun getErrorSpans(spanId: String, currentSpan: Span): String {
val spansInfo = SpansInfo()

enrichErrorSpans(spanId, spansInfo)
currentSpan.end()
currentSpan.context().parentId()?.let {
spans.getIfPresent(it)?.let { mutableSpans ->
if (mutableSpans.isNotEmpty()) {
spansInfo.spans.add(mutableSpans[0])
}
}
}

spansInfo.spans
.map { it.parentId() }
.forEach {
if (it != null) {
spans.invalidate(it)
}
}

return if (spansInfo.hasError) {
spanMapper.writeValueAsString(spansInfo.spans)
} else {
""
}
}

private fun enrichErrorSpans(spanId: String, spansInfo: SpansInfo) {
val currentSpans: List<MutableSpan>? = spans.getIfPresent(spanId)

currentSpans?.forEach {
spansInfo.spans.add(it)
if (it.tags().containsKey(SPAN_ERROR)) {
spansInfo.hasError = true
}
if (spanId != it.id()) {
enrichErrorSpans(it.id(), spansInfo)
}
}
}

private data class SpansInfo(
var hasError: Boolean = false,
val spans: MutableList<MutableSpan> = mutableListOf()
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.emeraldpay.dshackle.config.spans

import io.grpc.ForwardingServerCall.SimpleForwardingServerCall
import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor
import org.springframework.cloud.sleuth.Tracer

class ServerSpansInterceptor(
private val tracer: Tracer,
private val errorSpanHandler: ErrorSpanHandler
) : ServerInterceptor {
override fun <ReqT : Any?, RespT : Any?> interceptCall(
call: ServerCall<ReqT, RespT>,
headers: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> {
val serverCall = if (call.methodDescriptor.fullMethodName == "emerald.Blockchain/NativeCall") {
SpanServerCall(call)
} else {
call
}

return next.startCall(serverCall, headers)
}

private inner class SpanServerCall<ReqT, RespT>(
private val call: ServerCall<ReqT, RespT>
) : SimpleForwardingServerCall<ReqT, RespT>(call) {
override fun sendHeaders(headers: Metadata) {
tracer.currentSpan()?.let {
val parentId = it.context().parentId()
if (parentId != null) {
val spans = errorSpanHandler.getErrorSpans(it.context().spanId(), it)
if (spans.isNotBlank()) {
headers.put(Metadata.Key.of(SPAN_HEADER, Metadata.ASCII_STRING_MARSHALLER), spans)
}
}
call.sendHeaders(headers)
}
}
}
}
68 changes: 68 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/config/spans/SpanConfig.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.emeraldpay.dshackle.config.spans

import brave.Tracer
import brave.handler.SpanHandler
import com.fasterxml.jackson.annotation.JsonAutoDetect
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import io.grpc.ClientInterceptor
import io.grpc.ServerInterceptor
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

const val SPAN_HEADER = "spans"

@Configuration
@ConditionalOnProperty(value = ["spans.collect.enabled"], havingValue = "true")
open class SpanConfig {

@Bean
open fun spanMapper(): ObjectMapper =
ObjectMapper()
.apply {
setSerializationInclusion(JsonInclude.Include.NON_DEFAULT)
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
setVisibility(
this.serializationConfig.defaultVisibilityChecker
.withFieldVisibility(JsonAutoDetect.Visibility.ANY)
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE)
)
}

@Configuration
@ConditionalOnBean(SpanConfig::class)
@ConditionalOnProperty(value = ["spans.collect.main.enabled"], havingValue = "true")
open class MainSpanConfig {
@Bean
open fun clientSpansInterceptor(
zipkinSpanHandler: SpanHandler,
tracer: Tracer,
@Qualifier("spanMapper")
spanMapper: ObjectMapper
): ClientInterceptor = ClientSpansInterceptor(zipkinSpanHandler, tracer, spanMapper)
}

@Configuration
@ConditionalOnBean(SpanConfig::class)
@ConditionalOnProperty(value = ["spans.collect.provider.enabled"], havingValue = "true")
open class ProviderSpanConfig {

@Bean
open fun errorSpanHandler(
@Qualifier("spanMapper")
spanMapper: ObjectMapper
): ErrorSpanHandler = ErrorSpanHandler(spanMapper)

@Bean
open fun serverSpansInterceptor(
tracer: org.springframework.cloud.sleuth.Tracer,
errorSpanHandler: ErrorSpanHandler
): ServerInterceptor = ServerSpansInterceptor(tracer, errorSpanHandler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import brave.grpc.GrpcTracing
import com.google.common.annotations.VisibleForTesting
import io.emeraldpay.dshackle.BlockchainType
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.FileResolver
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.config.ChainsConfig
Expand Down Expand Up @@ -50,8 +51,11 @@ import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import io.emeraldpay.dshackle.upstream.forkchoice.MostWorkForkChoice
import io.emeraldpay.dshackle.upstream.forkchoice.NoChoiceWithPriorityForkChoice
import io.emeraldpay.dshackle.upstream.grpc.GrpcUpstreams
import io.grpc.ClientInterceptor
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.ApplicationArguments
import org.springframework.boot.ApplicationRunner
import org.springframework.context.ApplicationEventPublisher
Expand All @@ -76,8 +80,12 @@ open class ConfiguredUpstreams(
private val channelExecutor: Executor,
private val chainsConfig: ChainsConfig,
private val grpcTracing: GrpcTracing,
private val wsConnectionResubscribeScheduler: Scheduler
private val wsConnectionResubscribeScheduler: Scheduler,
@Autowired(required = false)
private val clientSpansInterceptor: ClientInterceptor?
) : ApplicationRunner {
@Value("\${spring.application.max-metadata-size}")
private var maxMetadataSize: Int = Defaults.maxMetadataSize

private val log = LoggerFactory.getLogger(ConfiguredUpstreams::class.java)
private var seq = AtomicInteger(0)
Expand Down Expand Up @@ -340,7 +348,9 @@ open class ConfiguredUpstreams(
grpcUpstreamsScheduler,
channelExecutor,
chainsConfig,
grpcTracing
grpcTracing,
clientSpansInterceptor,
maxMetadataSize
).apply {
timeout = options.timeout
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient
import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics
import io.grpc.ClientInterceptor
import io.grpc.Codec
import io.grpc.netty.NettyChannelBuilder
import io.micrometer.core.instrument.Counter
Expand Down Expand Up @@ -72,7 +73,9 @@ class GrpcUpstreams(
private val chainStatusScheduler: Scheduler,
private val grpcExecutor: Executor,
private val chainsConfig: ChainsConfig,
private val grpcTracing: GrpcTracing
private val grpcTracing: GrpcTracing,
private val clientSpansInterceptor: ClientInterceptor?,
private var maxMetadataSize: Int
) {
private val log = LoggerFactory.getLogger(GrpcUpstreams::class.java)

Expand All @@ -86,10 +89,15 @@ class GrpcUpstreams(
val chanelBuilder = NettyChannelBuilder.forAddress(host, port)
// some messages are very large. many of them in megabytes, some even in gigabytes (ex. ETH Traces)
.maxInboundMessageSize(Defaults.maxMessageSize)
.maxInboundMetadataSize(maxMetadataSize)
.enableRetry()
.intercept(grpcTracing.newClientInterceptor())
.executor(grpcExecutor)
.maxRetryAttempts(3)
clientSpansInterceptor?.let {
chanelBuilder.intercept(it)
log.info("Collect spans is enabled")
}
if (auth != null && StringUtils.isNotEmpty(auth.ca)) {
chanelBuilder
.useTransportSecurity()
Expand Down
Loading

0 comments on commit 6b0e201

Please sign in to comment.