From d36610cf92875060b2cc72a959f5bbeea9338d83 Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Wed, 1 Mar 2023 15:20:07 +0800 Subject: [PATCH 1/9] add opentelemetry package --- go.mod | 6 +++++- go.sum | 11 +++++++++++ server/server.go | 4 ++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index f578e40b7a..c48a0e2b58 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/prometheus/client_golang v1.13.0 github.com/ryanuber/columnize v2.1.2+incompatible github.com/spf13/cobra v1.5.0 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.8.2 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/umbracle/go-eth-bn256 v0.0.0-20190607160430-b36caf4e0f6b github.com/umbracle/go-web3 v0.0.0-20220224145938-aaa1038c1b69 @@ -79,9 +79,13 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.18.3 // indirect github.com/aws/smithy-go v1.13.5 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect github.com/mattn/go-pointer v0.0.1 // indirect github.com/multiformats/go-multicodec v0.5.0 // indirect + go.opentelemetry.io/otel v1.14.0 // indirect + go.opentelemetry.io/otel/trace v1.14.0 // indirect ) require ( diff --git a/go.sum b/go.sum index ff37ab356d..e32e78ac24 100644 --- a/go.sum +++ b/go.sum @@ -243,7 +243,9 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= @@ -311,6 +313,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -792,6 +795,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -802,6 +807,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= @@ -846,7 +853,11 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/otel v1.6.3/go.mod h1:7BgNga5fNlF/iZjG06hM3yofffp0ofKCDwSXx1GC4dI= +go.opentelemetry.io/otel v1.14.0 h1:/79Huy8wbf5DnIPhemGB+zEPVwnN6fuQybr/SRXa6hM= +go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU= go.opentelemetry.io/otel/trace v1.6.3/go.mod h1:GNJQusJlUgZl9/TQBPKU/Y/ty+0iVB5fjhKeJGZPGFs= +go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M= +go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/server/server.go b/server/server.go index 0e8c22b986..1aaee64d21 100644 --- a/server/server.go +++ b/server/server.go @@ -34,6 +34,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel" "google.golang.org/grpc" ) @@ -152,6 +153,9 @@ func newLevelDBBuilder(logger hclog.Logger, config *Config, path string) kvdb.Le // NewServer creates a new Minimal server, using the passed in configuration func NewServer(config *Config) (*Server, error) { + _, span := otel.Tracer("server").Start(context.Background(), "NewServer") + defer span.End() + logger, err := newLoggerFromConfig(config) if err != nil { return nil, fmt.Errorf("could not setup new logger instance, %w", err) From 65c68fbfd04566c8179cb866f6081b414a2d2c8e Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Mon, 6 Mar 2023 13:03:50 +0800 Subject: [PATCH 2/9] provider init --- command/default.go | 2 + command/helper/helper.go | 23 ++++++++++++ command/server/config.go | 2 + command/server/server.go | 7 ++++ go.mod | 2 + go.sum | 4 ++ helper/common/common.go | 39 +++++++++++++++++++ helper/common/common_test.go | 73 ++++++++++++++++++++++++++++++++++++ helper/telemetry/jaeger.go | 55 +++++++++++++++++++++++++++ jsonrpc/jsonrpc.go | 4 +- server/config.go | 1 + 11 files changed, 210 insertions(+), 2 deletions(-) create mode 100644 helper/common/common_test.go create mode 100644 helper/telemetry/jaeger.go diff --git a/command/default.go b/command/default.go index 3a60560dbe..ce9091f96e 100644 --- a/command/default.go +++ b/command/default.go @@ -19,6 +19,8 @@ const ( GraphQLAddressFlag = "graphql-address" PprofFlag = "pprof" PprofAddressFlag = "pprof-address" + JaegerFlag = "jaeger" + JaegerAddressFlag = "jaeger-address" ) // Legacy flag that needs to be present to preserve backwards diff --git a/command/helper/helper.go b/command/helper/helper.go index d82c328a89..6d71e95127 100644 --- a/command/helper/helper.go +++ b/command/helper/helper.go @@ -259,6 +259,29 @@ func GetPprofFlag(cmd *cobra.Command) bool { return v } +// RegustreTelemetryFlag registers the telemetry flags +func RegisterJaegerFlag(cmd *cobra.Command) { + cmd.PersistentFlags().Bool( + command.JaegerFlag, + false, + "enable the jaeger telemetry", + ) + + cmd.PersistentFlags().String( + command.JaegerAddressFlag, + fmt.Sprintf("http://%s:%d/api/traces", LocalHostBinding, server.DefaultJaegerPort), + "jaeger thrift protocol (HTTP POST to /api/traces)", + ) +} + +// GetJaegerFlag extracts the telemetry flag +func GetJaegerFlag(cmd *cobra.Command) (bool, string) { + v, _ := cmd.Flags().GetBool(command.JaegerFlag) + addr, _ := cmd.Flags().GetString(command.JaegerAddressFlag) + + return v, addr +} + // ParseGraphQLAddress parses the passed in GraphQL address func ParseGraphQLAddress(graphqlAddress string) (*url.URL, error) { return url.ParseRequestURI(graphqlAddress) diff --git a/command/server/config.go b/command/server/config.go index 1c926ba300..ac3d6ac73f 100644 --- a/command/server/config.go +++ b/command/server/config.go @@ -43,6 +43,8 @@ type Config struct { type Telemetry struct { PrometheusAddr string `json:"prometheus_addr"` EnableIOTimer bool `json:"prometheus_enable_disk_io_timer"` + EnableJaeger bool `json:"enable_jaeger"` + JaegerURL string `json:"jaeger_url"` } // Network defines the network configuration params diff --git a/command/server/server.go b/command/server/server.go index b4387c80ea..e4a53c73ed 100644 --- a/command/server/server.go +++ b/command/server/server.go @@ -371,6 +371,7 @@ func setDevFlags(cmd *cobra.Command) { ) helper.RegisterPprofFlag(cmd) + helper.RegisterJaegerFlag(cmd) _ = cmd.Flags().MarkHidden(devIntervalFlag) } @@ -482,6 +483,12 @@ func runCommand(cmd *cobra.Command, _ []string) { // pprof flag params.rawConfig.EnablePprof = helper.GetPprofFlag(cmd) + // jaeger flag + if enable, jaegerURL := helper.GetJaegerFlag(cmd); enable { + params.rawConfig.Telemetry.EnableJaeger = enable + params.rawConfig.Telemetry.JaegerURL = jaegerURL + } + if err := runServerLoop(params.generateConfig(), outputter); err != nil { outputter.SetError(err) outputter.WriteOutput() diff --git a/go.mod b/go.mod index c48a0e2b58..b188396cca 100644 --- a/go.mod +++ b/go.mod @@ -85,6 +85,7 @@ require ( github.com/mattn/go-pointer v0.0.1 // indirect github.com/multiformats/go-multicodec v0.5.0 // indirect go.opentelemetry.io/otel v1.14.0 // indirect + go.opentelemetry.io/otel/sdk v1.14.0 // indirect go.opentelemetry.io/otel/trace v1.14.0 // indirect ) @@ -180,6 +181,7 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.44.0 // indirect github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect + go.opentelemetry.io/otel/exporters/jaeger v1.14.0 go.uber.org/multierr v1.8.0 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.7.0 // indirect diff --git a/go.sum b/go.sum index e32e78ac24..329b8f715f 100644 --- a/go.sum +++ b/go.sum @@ -855,6 +855,10 @@ go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/otel v1.6.3/go.mod h1:7BgNga5fNlF/iZjG06hM3yofffp0ofKCDwSXx1GC4dI= go.opentelemetry.io/otel v1.14.0 h1:/79Huy8wbf5DnIPhemGB+zEPVwnN6fuQybr/SRXa6hM= go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU= +go.opentelemetry.io/otel/exporters/jaeger v1.14.0 h1:CjbUNd4iN2hHmWekmOqZ+zSCU+dzZppG8XsV+A3oc8Q= +go.opentelemetry.io/otel/exporters/jaeger v1.14.0/go.mod h1:4Ay9kk5vELRrbg5z4cpP9EtmQRFap2Wb0woPG4lujZA= +go.opentelemetry.io/otel/sdk v1.14.0 h1:PDCppFRDq8A1jL9v6KMI6dYesaq+DFcDZvjsoGvxGzY= +go.opentelemetry.io/otel/sdk v1.14.0/go.mod h1:bwIC5TjrNG6QDCHNWvW4HLHtUQ4I+VQDsnjhvyZCALM= go.opentelemetry.io/otel/trace v1.6.3/go.mod h1:GNJQusJlUgZl9/TQBPKU/Y/ty+0iVB5fjhKeJGZPGFs= go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M= go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8= diff --git a/helper/common/common.go b/helper/common/common.go index be5590dd99..58faa52ecb 100644 --- a/helper/common/common.go +++ b/helper/common/common.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "net" "os" "os/signal" "path/filepath" @@ -216,3 +217,41 @@ func PadLeftOrTrim(bb []byte, size int) []byte { return tmp } + +// Substr returns a substring of the input string +func Substr(input string, start int, size int) string { + strRunes := []rune(input) + + if start < 0 { + start = 0 + } + + if start >= len(strRunes) { + return "" + } + + if (start + size) > len(strRunes) { + size = len(strRunes) - start + } + + return string(strRunes[start : start+size]) +} + +// GetOutboundIP returns the preferred outbound ip of this machine +func GetOutboundIP() (net.IP, error) { + // any public address will do + conn, err := net.Dial("udp", "1.1.1.1") + if err != nil { + return nil, err + } + defer conn.Close() + + address, ok := conn.LocalAddr().(*net.UDPAddr) + if !ok { + return nil, errors.New("cannot assert UDPAddr") + } + + ipaddress := address.IP + + return ipaddress, nil +} diff --git a/helper/common/common_test.go b/helper/common/common_test.go new file mode 100644 index 0000000000..0438ec3a95 --- /dev/null +++ b/helper/common/common_test.go @@ -0,0 +1,73 @@ +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_UnicodeSubstr(t *testing.T) { + textSimples := []string{ + "", + "hello world", + "你好世界", + "こんにちは世界", + "Привіт Світ", + } + + // start >= 0, size <= len(string) + { + results := []string{ + "", + "he", + "你好", + "こん", + "Пр", + } + + for i, text := range textSimples { + result := Substr(text, 0, 2) + + assert.Equal(t, result, results[i]) + } + } + + // start < 0, size <= len(string) + { + results := []string{ + "", + "he", + "你好", + "こん", + "Пр", + } + + for i, text := range textSimples { + result := Substr(text, -1, 2) + + assert.Equal(t, result, results[i]) + } + } + + // start >= 0, size > len(string) + { + results := textSimples + + for i, text := range textSimples { + result := Substr(text, 0, 255) + + assert.Equal(t, result, results[i]) + } + } + + // start < 0, size > len(string) + { + results := textSimples + + for i, text := range textSimples { + result := Substr(text, -1, 255) + + assert.Equal(t, result, results[i]) + } + } +} diff --git a/helper/telemetry/jaeger.go b/helper/telemetry/jaeger.go new file mode 100644 index 0000000000..b84e4aa338 --- /dev/null +++ b/helper/telemetry/jaeger.go @@ -0,0 +1,55 @@ +package telemetry + +import ( + "os" + + "github.com/dogechain-lab/dogechain/helper/common" + "github.com/dogechain-lab/dogechain/versioning" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/sdk/resource" + + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" +) + +// NewTraceProvider creates a new trace provider +func NewJaegerProvider(url string, service string) (*tracesdk.TracerProvider, error) { + hostname, err := os.Hostname() + if err != nil { + // get ip address + ip, err := common.GetOutboundIP() + if err != nil { + hostname = "unknown" + } else { + hostname = ip.String() + } + } + + // Create the Jaeger exporter + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) + if err != nil { + return nil, err + } + + tp := tracesdk.NewTracerProvider( + // Always be sure to batch in production. + tracesdk.WithBatcher(exp), + // Record information about this application in a Resource. + tracesdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(service), + attribute.String("hostname", hostname), + attribute.String("version", versioning.Version), + attribute.String("commit", common.Substr(versioning.Commit, 0, 8)), + attribute.String("buildTime", versioning.BuildTime), + )), + ) + + // Register our TracerProvider as the global so any imported + // instrumentation in the future will default to using it. + otel.SetTracerProvider(tp) + + return tp, nil +} diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index a3906e8eb0..4f20d26675 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -73,8 +73,8 @@ type Config struct { EnableWS bool PriceLimit uint64 EnablePProf bool // whether pprof enable or not - - Metrics *Metrics + EnableJaeger bool // whether jaeger enable or not + Metrics *Metrics } // NewJSONRPC returns the JSONRPC http server diff --git a/server/config.go b/server/config.go index e85a4d31ce..c564a0f42a 100644 --- a/server/config.go +++ b/server/config.go @@ -14,6 +14,7 @@ const DefaultGRPCPort int = 9632 const DefaultJSONRPCPort int = 8545 const DefaultGraphQLPort int = 9898 const DefaultPprofPort int = 6060 +const DefaultJaegerPort int = 14268 // Config is used to parametrize the minimal client type Config struct { From 19e962764a340f02da627b7575f74c221f83ab34 Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Tue, 7 Mar 2023 10:37:19 +0800 Subject: [PATCH 3/9] first version for the tracking interface --- helper/telemetry/interface.go | 44 +++++++++++ helper/telemetry/jaeger.go | 125 +++++++++++++++++++++++++++++-- helper/telemetry/nil_provider.go | 81 ++++++++++++++++++++ helper/telemetry/type_convert.go | 42 +++++++++++ 4 files changed, 287 insertions(+), 5 deletions(-) create mode 100644 helper/telemetry/interface.go create mode 100644 helper/telemetry/nil_provider.go create mode 100644 helper/telemetry/type_convert.go diff --git a/helper/telemetry/interface.go b/helper/telemetry/interface.go new file mode 100644 index 0000000000..06e67891e7 --- /dev/null +++ b/helper/telemetry/interface.go @@ -0,0 +1,44 @@ +package telemetry + +import ( + "context" + + "go.opentelemetry.io/otel/codes" +) + +type Span interface { + // SetAttribute sets an attribute (base type) + SetAttribute(label string, value interface{}) + + // SetAttributes sets attributes + SetAttributes(attributes map[string]interface{}) + + // SetStatus sets the status + SetStatus(code codes.Code, info string) + + // SetError sets the error + SetError(err error) + + // End ends the span + End() + + // context returns the span context + context() context.Context +} + +// Tracer provides a tracer +type Tracer interface { + // Start starts a new span + Start(name string) Span + + // StartWithParent starts a new span with a parent + StartWithParent(parent Span, name string) Span +} + +type TracerProvider interface { + // NewTracer creates a new tracer + NewTracer(namespace string) Tracer + + // Shutdown shuts down the tracer provider + Shutdown() error +} diff --git a/helper/telemetry/jaeger.go b/helper/telemetry/jaeger.go index b84e4aa338..e88b447eda 100644 --- a/helper/telemetry/jaeger.go +++ b/helper/telemetry/jaeger.go @@ -1,21 +1,24 @@ package telemetry import ( + "context" "os" "github.com/dogechain-lab/dogechain/helper/common" "github.com/dogechain-lab/dogechain/versioning" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/jaeger" "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/trace" tracesdk "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" ) -// NewTraceProvider creates a new trace provider -func NewJaegerProvider(url string, service string) (*tracesdk.TracerProvider, error) { +// newJaegerProvider creates a new jaeger provider +func newJaegerProvider(url string, service string) (*tracesdk.TracerProvider, error) { hostname, err := os.Hostname() if err != nil { // get ip address @@ -47,9 +50,121 @@ func NewJaegerProvider(url string, service string) (*tracesdk.TracerProvider, er )), ) - // Register our TracerProvider as the global so any imported - // instrumentation in the future will default to using it. + return tp, nil +} + +// jaegerSpan +type jaegerSpan struct { + // span + span trace.Span + + // context + ctx context.Context +} + +// SetAttribute sets an attribute +func (s *jaegerSpan) SetAttribute(key string, value interface{}) { + s.span.SetAttributes(attribute.KeyValue{ + Key: attribute.Key(key), + Value: convertTypeToAttribute(value), + }) +} + +// SetAttributes sets attributes +func (s *jaegerSpan) SetAttributes(attributes map[string]interface{}) { + kvs := make([]attribute.KeyValue, 0, len(attributes)) + for key, value := range attributes { + kvs = append(kvs, attribute.KeyValue{ + Key: attribute.Key(key), + Value: convertTypeToAttribute(value), + }) + } + + s.span.SetAttributes(kvs...) +} + +func (s *jaegerSpan) SetStatus(code codes.Code, info string) { + s.span.SetStatus(code, info) +} + +// SetError sets the error +func (s *jaegerSpan) SetError(err error) { + s.span.RecordError(err) +} + +// End ends the span +func (s *jaegerSpan) End() { + s.span.End() +} + +// context returns the span context +func (s *jaegerSpan) context() context.Context { + return s.ctx +} + +// jaegerTracer +type jaegerTracer struct { + // context + context context.Context + + // tracer + tracer trace.Tracer +} + +// Start starts a new span +func (t *jaegerTracer) Start(name string) Span { + ctx, span := t.tracer.Start(t.context, name) + + return &jaegerSpan{ + span: span, + ctx: ctx, + } +} + +// StartWithParent starts a new span with a parent +func (t *jaegerTracer) StartWithParent(parent Span, name string) Span { + ctx, span := t.tracer.Start(parent.context(), name) + + return &jaegerSpan{ + span: span, + ctx: ctx, + } +} + +// jaegerTracerProvider +type jaegerTracerProvider struct { + // context + context context.Context + + // provider + provider *tracesdk.TracerProvider +} + +// NewTracer creates a new tracer +func (p *jaegerTracerProvider) NewTracer(namespace string) Tracer { + return &jaegerTracer{ + context: p.context, + tracer: p.provider.Tracer(namespace), + } +} + +// Shutdown shuts down the tracer provider +func (p *jaegerTracerProvider) Shutdown() error { + return p.provider.Shutdown(p.context) +} + +// NewTracerProvider creates a new trace provider +func NewTracerProvider(url string, service string) (TracerProvider, error) { + tp, err := newJaegerProvider(url, service) + if err != nil { + return nil, err + } + + // Register TracerProvider otel.SetTracerProvider(tp) - return tp, nil + return &jaegerTracerProvider{ + context: context.Background(), + provider: tp, + }, nil } diff --git a/helper/telemetry/nil_provider.go b/helper/telemetry/nil_provider.go new file mode 100644 index 0000000000..231b575f64 --- /dev/null +++ b/helper/telemetry/nil_provider.go @@ -0,0 +1,81 @@ +package telemetry + +import ( + "context" + + "go.opentelemetry.io/otel/codes" +) + +// NilSpan +type NilSpan struct { + ctx context.Context +} + +// SetAttribute sets an attribute +func (s *NilSpan) SetAttribute(key string, value interface{}) { +} + +// SetAttributes sets attributes +func (s *NilSpan) SetAttributes(attributes map[string]interface{}) { +} + +func (s *NilSpan) SetStatus(code codes.Code, info string) { +} + +// SetError sets the error +func (s *NilSpan) SetError(err error) { +} + +// End ends the span +func (s *NilSpan) End() { +} + +// context returns the span context +func (s *NilSpan) context() context.Context { + return s.ctx +} + +// NilTracer +type NilTracer struct { + // context + context context.Context +} + +// Start starts a new span +func (t *NilTracer) Start(name string) Span { + return &NilSpan{ + ctx: t.context, + } +} + +// StartWithParent starts a new span with a parent +func (t *NilTracer) StartWithParent(parent Span, name string) Span { + return &NilSpan{ + ctx: t.context, + } +} + +// NilTracerProvider +type NilTracerProvider struct { + // context + context context.Context +} + +// NewTracer creates a new tracer +func (p *NilTracerProvider) NewTracer(namespace string) Tracer { + return &NilTracer{ + context: p.context, + } +} + +// Shutdown shuts down the tracer provider +func (p *NilTracerProvider) Shutdown() error { + return nil +} + +// NewNilTracerProvider creates a new trace provider +func NewNilTracerProvider(url string, service string) (TracerProvider, error) { + return &NilTracerProvider{ + context: context.Background(), + }, nil +} diff --git a/helper/telemetry/type_convert.go b/helper/telemetry/type_convert.go new file mode 100644 index 0000000000..88035022d1 --- /dev/null +++ b/helper/telemetry/type_convert.go @@ -0,0 +1,42 @@ +package telemetry + +import ( + "go.opentelemetry.io/otel/attribute" + + "fmt" +) + +func convertTypeToAttribute(value interface{}) attribute.Value { + switch v := value.(type) { + case string: + return attribute.StringValue(v) + case int: + return attribute.IntValue(v) + case int8: + return attribute.IntValue(int(v)) + case int16: + return attribute.IntValue(int(v)) + case int32: + return attribute.IntValue(int(v)) + case int64: + return attribute.Int64Value(v) + case uint: + return attribute.Int64Value(int64(v)) + case uint8: + return attribute.Int64Value(int64(v)) + case uint16: + return attribute.Int64Value(int64(v)) + case uint32: + return attribute.Int64Value(int64(v)) + case uint64: + return attribute.Int64Value(int64(v)) + case float32: + return attribute.Float64Value(float64(v)) + case float64: + return attribute.Float64Value(v) + case bool: + return attribute.BoolValue(v) + default: + return attribute.StringValue(fmt.Sprintf("%v", v)) + } +} From 7b3152362833d3f9551f3dfb6f8ad393110d4fa9 Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Tue, 7 Mar 2023 12:07:46 +0800 Subject: [PATCH 4/9] tracer interface init --- command/server/config.go | 1 + command/server/params.go | 2 + helper/telemetry/interface.go | 27 +++++++- helper/telemetry/jaeger.go | 29 +++++++-- helper/telemetry/nil_provider.go | 53 +++++++++------- network/e2e_testing.go | 5 +- network/peer_connect.go | 71 +++++++++++++++++++++ network/server.go | 106 ++++++++++--------------------- network/server_identity.go | 3 + network/testing/testing.go | 19 ++++++ server/config.go | 2 + server/server.go | 37 ++++++++++- 12 files changed, 245 insertions(+), 110 deletions(-) create mode 100644 network/peer_connect.go diff --git a/command/server/config.go b/command/server/config.go index ac3d6ac73f..c1b2a0780c 100644 --- a/command/server/config.go +++ b/command/server/config.go @@ -93,6 +93,7 @@ func DefaultConfig() *Config { }, Telemetry: &Telemetry{ EnableIOTimer: false, + EnableJaeger: false, }, ShouldSeal: false, TxPool: &TxPool{ diff --git a/command/server/params.go b/command/server/params.go index 4e64d7de90..f48fe4b2cf 100644 --- a/command/server/params.go +++ b/command/server/params.go @@ -226,6 +226,8 @@ func (p *serverParams) generateConfig() *server.Config { Telemetry: &server.Telemetry{ PrometheusAddr: p.prometheusAddress, EnableIOMetrics: p.prometheusIOMetrics, + EnableJaeger: p.rawConfig.Telemetry.EnableJaeger, + JaegerURL: p.rawConfig.Telemetry.JaegerURL, }, Network: &network.Config{ NoDiscover: p.rawConfig.Network.NoDiscover, diff --git a/helper/telemetry/interface.go b/helper/telemetry/interface.go index 06e67891e7..d0d07ff257 100644 --- a/helper/telemetry/interface.go +++ b/helper/telemetry/interface.go @@ -6,6 +6,26 @@ import ( "go.opentelemetry.io/otel/codes" ) +type contextLabel string +type contextValue string + +const ( + ContextNamespace contextLabel = "telemetry" +) + +type Code codes.Code + +const ( + // Unset is the default status code + Unset Code = Code(codes.Unset) + + // Error indicates the operation contains an error + Error Code = Code(codes.Error) + + // Ok indicates operation has been validated by an Application developers + Ok Code = Code(codes.Ok) +) + type Span interface { // SetAttribute sets an attribute (base type) SetAttribute(label string, value interface{}) @@ -13,8 +33,11 @@ type Span interface { // SetAttributes sets attributes SetAttributes(attributes map[string]interface{}) + // AddEvent adds an event + AddEvent(name string, attributes map[string]interface{}) + // SetStatus sets the status - SetStatus(code codes.Code, info string) + SetStatus(code Code, info string) // SetError sets the error SetError(err error) @@ -40,5 +63,5 @@ type TracerProvider interface { NewTracer(namespace string) Tracer // Shutdown shuts down the tracer provider - Shutdown() error + Shutdown(context.Context) error } diff --git a/helper/telemetry/jaeger.go b/helper/telemetry/jaeger.go index e88b447eda..83d7bd4b66 100644 --- a/helper/telemetry/jaeger.go +++ b/helper/telemetry/jaeger.go @@ -17,6 +17,10 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.17.0" ) +const ( + JaegerContextName contextValue = "jaeger" +) + // newJaegerProvider creates a new jaeger provider func newJaegerProvider(url string, service string) (*tracesdk.TracerProvider, error) { hostname, err := os.Hostname() @@ -48,6 +52,7 @@ func newJaegerProvider(url string, service string) (*tracesdk.TracerProvider, er attribute.String("commit", common.Substr(versioning.Commit, 0, 8)), attribute.String("buildTime", versioning.BuildTime), )), + tracesdk.WithSampler(tracesdk.AlwaysSample()), ) return tp, nil @@ -83,8 +88,20 @@ func (s *jaegerSpan) SetAttributes(attributes map[string]interface{}) { s.span.SetAttributes(kvs...) } -func (s *jaegerSpan) SetStatus(code codes.Code, info string) { - s.span.SetStatus(code, info) +func (s *jaegerSpan) AddEvent(name string, attributes map[string]interface{}) { + kvs := make([]attribute.KeyValue, 0, len(attributes)) + for key, value := range attributes { + kvs = append(kvs, attribute.KeyValue{ + Key: attribute.Key(key), + Value: convertTypeToAttribute(value), + }) + } + + s.span.AddEvent(name, trace.WithAttributes(kvs...)) +} + +func (s *jaegerSpan) SetStatus(code Code, info string) { + s.span.SetStatus(codes.Code(code), info) } // SetError sets the error @@ -149,12 +166,12 @@ func (p *jaegerTracerProvider) NewTracer(namespace string) Tracer { } // Shutdown shuts down the tracer provider -func (p *jaegerTracerProvider) Shutdown() error { - return p.provider.Shutdown(p.context) +func (p *jaegerTracerProvider) Shutdown(ctx context.Context) error { + return p.provider.Shutdown(ctx) } // NewTracerProvider creates a new trace provider -func NewTracerProvider(url string, service string) (TracerProvider, error) { +func NewTracerProvider(ctx context.Context, url string, service string) (TracerProvider, error) { tp, err := newJaegerProvider(url, service) if err != nil { return nil, err @@ -164,7 +181,7 @@ func NewTracerProvider(url string, service string) (TracerProvider, error) { otel.SetTracerProvider(tp) return &jaegerTracerProvider{ - context: context.Background(), + context: context.WithValue(ctx, JaegerContextName, JaegerContextName), provider: tp, }, nil } diff --git a/helper/telemetry/nil_provider.go b/helper/telemetry/nil_provider.go index 231b575f64..5b64634957 100644 --- a/helper/telemetry/nil_provider.go +++ b/helper/telemetry/nil_provider.go @@ -2,80 +2,85 @@ package telemetry import ( "context" +) - "go.opentelemetry.io/otel/codes" +const ( + NilContextName contextValue = "nil" ) -// NilSpan -type NilSpan struct { +// nilSpan +type nilSpan struct { ctx context.Context } // SetAttribute sets an attribute -func (s *NilSpan) SetAttribute(key string, value interface{}) { +func (s *nilSpan) SetAttribute(key string, value interface{}) { } // SetAttributes sets attributes -func (s *NilSpan) SetAttributes(attributes map[string]interface{}) { +func (s *nilSpan) SetAttributes(attributes map[string]interface{}) { +} + +func (s *nilSpan) AddEvent(name string, attributes map[string]interface{}) { } -func (s *NilSpan) SetStatus(code codes.Code, info string) { +func (s *nilSpan) SetStatus(code Code, info string) { } // SetError sets the error -func (s *NilSpan) SetError(err error) { +func (s *nilSpan) SetError(err error) { } // End ends the span -func (s *NilSpan) End() { +func (s *nilSpan) End() { } // context returns the span context -func (s *NilSpan) context() context.Context { +func (s *nilSpan) context() context.Context { return s.ctx } -// NilTracer -type NilTracer struct { +// nilTracer +type nilTracer struct { // context context context.Context } // Start starts a new span -func (t *NilTracer) Start(name string) Span { - return &NilSpan{ +func (t *nilTracer) Start(name string) Span { + return &nilSpan{ ctx: t.context, } } // StartWithParent starts a new span with a parent -func (t *NilTracer) StartWithParent(parent Span, name string) Span { - return &NilSpan{ +func (t *nilTracer) StartWithParent(parent Span, name string) Span { + return &nilSpan{ ctx: t.context, } } -// NilTracerProvider -type NilTracerProvider struct { +// nilTracerProvider +type nilTracerProvider struct { // context context context.Context } // NewTracer creates a new tracer -func (p *NilTracerProvider) NewTracer(namespace string) Tracer { - return &NilTracer{ +func (p *nilTracerProvider) NewTracer(namespace string) Tracer { + return &nilTracer{ context: p.context, } } // Shutdown shuts down the tracer provider -func (p *NilTracerProvider) Shutdown() error { +func (p *nilTracerProvider) Shutdown(ctx context.Context) error { return nil } // NewNilTracerProvider creates a new trace provider -func NewNilTracerProvider(url string, service string) (TracerProvider, error) { - return &NilTracerProvider{ - context: context.Background(), - }, nil +func NewNilTracerProvider(ctx context.Context) TracerProvider { + return &nilTracerProvider{ + context: context.WithValue(ctx, ContextNamespace, NilContextName), + } } diff --git a/network/e2e_testing.go b/network/e2e_testing.go index ab7800c8ce..16965fd69d 100644 --- a/network/e2e_testing.go +++ b/network/e2e_testing.go @@ -13,6 +13,7 @@ import ( "github.com/dogechain-lab/dogechain/chain" "github.com/dogechain-lab/dogechain/helper/common" + "github.com/dogechain-lab/dogechain/helper/telemetry" "github.com/dogechain-lab/dogechain/helper/tests" "github.com/dogechain-lab/dogechain/secrets" "github.com/dogechain-lab/dogechain/secrets/local" @@ -334,7 +335,9 @@ func CreateServer(params *CreateServerParams) (*DefaultServer, error) { cfg.SecretsManager = secretsManager cfg.Metrics = NilMetrics() - server, err := newServer(params.Logger, cfg) + tracerProvider := telemetry.NewNilTracerProvider(context.Background()) + + server, err := newServer(params.Logger, tracerProvider.NewTracer("test"), cfg) if err != nil { return nil, err } diff --git a/network/peer_connect.go b/network/peer_connect.go new file mode 100644 index 0000000000..aa717a952e --- /dev/null +++ b/network/peer_connect.go @@ -0,0 +1,71 @@ +package network + +import ( + "github.com/dogechain-lab/dogechain/network/client" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" +) + +// PeerConnInfo holds the connection information about the peer +type PeerConnInfo struct { + Info peer.AddrInfo + + connDirections map[network.Direction]bool + protocolClient map[string]client.GrpcClientCloser +} + +// addConnDirection adds a connection direction +func (pci *PeerConnInfo) addConnDirection(direction network.Direction) { + pci.connDirections[direction] = true +} + +// removeConnDirection adds a connection direction +func (pci *PeerConnInfo) removeConnDirection(direction network.Direction) { + pci.connDirections[direction] = false +} + +// existsConnDirection returns the connection direction +func (pci *PeerConnInfo) existsConnDirection(direction network.Direction) bool { + exist, ok := pci.connDirections[direction] + if !ok { + return false + } + + return exist +} + +func (pci *PeerConnInfo) noConnectionAvailable() bool { + // if all directions are false, return false + for _, v := range pci.connDirections { + if v { + return false + } + } + + return true +} + +// addProtocolClient adds a protocol stream +func (pci *PeerConnInfo) addProtocolClient(protocol string, stream client.GrpcClientCloser) { + pci.protocolClient[protocol] = stream +} + +// cleanProtocolStreams clean and closes all protocol stream +func (pci *PeerConnInfo) cleanProtocolStreams() []error { + errs := []error{} + + for _, clt := range pci.protocolClient { + if clt != nil { + errs = append(errs, clt.Close()) + } + } + + pci.protocolClient = make(map[string]client.GrpcClientCloser) + + return errs +} + +// getProtocolClient fetches the protocol stream, if any +func (pci *PeerConnInfo) getProtocolClient(protocol string) client.GrpcClientCloser { + return pci.protocolClient[protocol] +} diff --git a/network/server.go b/network/server.go index af8749378f..90c96b6465 100644 --- a/network/server.go +++ b/network/server.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/dogechain-lab/dogechain/helper/telemetry" "github.com/dogechain-lab/dogechain/network/client" "github.com/dogechain-lab/dogechain/network/common" "github.com/dogechain-lab/dogechain/network/dial" @@ -64,8 +65,10 @@ var ( ) type DefaultServer struct { - logger hclog.Logger // the logger - config *Config // the base networking server configuration + logger hclog.Logger // the logger + tracer telemetry.Tracer // the tracer for telemetry + + config *Config // the base networking server configuration closeCh chan struct{} // the channel used for closing the networking server closeWg sync.WaitGroup // the waitgroup used for closing the networking server @@ -103,20 +106,28 @@ type DefaultServer struct { } // NewServer returns a new instance of the networking server -func NewServer(logger hclog.Logger, config *Config) (Server, error) { - return newServer(logger, config) +func NewServer(logger hclog.Logger, tracer telemetry.Tracer, config *Config) (Server, error) { + return newServer(logger, tracer, config) } -func newServer(logger hclog.Logger, config *Config) (*DefaultServer, error) { +func newServer(logger hclog.Logger, tracer telemetry.Tracer, config *Config) (*DefaultServer, error) { logger = logger.Named("network") + span := tracer.Start("init") + defer span.End() + key, err := setupLibp2pKey(config.SecretsManager) if err != nil { return nil, err } + span.SetAttribute("listen_addr", config.Addr.String()) + span.SetAttribute("listen_port", config.Addr.Port) + listenAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", config.Addr.IP.String(), config.Addr.Port)) if err != nil { + span.SetError(err) + return nil, err } @@ -137,6 +148,8 @@ func newServer(logger hclog.Logger, config *Config) (*DefaultServer, error) { return addrs } + span.SetAttribute("max_peers", config.MaxPeers) + maxPeers := config.MaxInboundPeers + config.MaxOutboundPeers if maxPeers == 0 { return nil, fmt.Errorf("max peers is 0, please set MaxInboundPeers and MaxOutboundPeers greater than 0") @@ -183,6 +196,7 @@ func newServer(logger hclog.Logger, config *Config) (*DefaultServer, error) { srv := &DefaultServer{ logger: logger, + tracer: tracer, config: config, host: host, selfID: host.ID(), @@ -220,75 +234,6 @@ func newServer(logger hclog.Logger, config *Config) (*DefaultServer, error) { return srv, nil } -// HasFreeConnectionSlot checks if there are free connection slots in the specified direction [Thread safe] -func (s *DefaultServer) HasFreeConnectionSlot(direction network.Direction) bool { - return s.connectionCounts.HasFreeConnectionSlot(direction) -} - -// PeerConnInfo holds the connection information about the peer -type PeerConnInfo struct { - Info peer.AddrInfo - - connDirections map[network.Direction]bool - protocolClient map[string]client.GrpcClientCloser -} - -// addConnDirection adds a connection direction -func (pci *PeerConnInfo) addConnDirection(direction network.Direction) { - pci.connDirections[direction] = true -} - -// removeConnDirection adds a connection direction -func (pci *PeerConnInfo) removeConnDirection(direction network.Direction) { - pci.connDirections[direction] = false -} - -// existsConnDirection returns the connection direction -func (pci *PeerConnInfo) existsConnDirection(direction network.Direction) bool { - exist, ok := pci.connDirections[direction] - if !ok { - return false - } - - return exist -} - -func (pci *PeerConnInfo) noConnectionAvailable() bool { - // if all directions are false, return false - for _, v := range pci.connDirections { - if v { - return false - } - } - - return true -} - -// addProtocolClient adds a protocol stream -func (pci *PeerConnInfo) addProtocolClient(protocol string, stream client.GrpcClientCloser) { - pci.protocolClient[protocol] = stream -} - -// cleanProtocolStreams clean and closes all protocol stream -func (pci *PeerConnInfo) cleanProtocolStreams() []error { - errs := []error{} - - for _, clt := range pci.protocolClient { - if clt != nil { - errs = append(errs, clt.Close()) - } - } - - pci.protocolClient = make(map[string]client.GrpcClientCloser) - - return errs -} - -// getProtocolClient fetches the protocol stream, if any -func (pci *PeerConnInfo) getProtocolClient(protocol string) client.GrpcClientCloser { - return pci.protocolClient[protocol] -} - // setupLibp2pKey is a helper method for setting up the networking private key func setupLibp2pKey(secretsManager secrets.SecretsManager) (crypto.PrivKey, error) { var key crypto.PrivKey @@ -321,6 +266,9 @@ func setupLibp2pKey(secretsManager secrets.SecretsManager) (crypto.PrivKey, erro // Start starts the networking services func (s *DefaultServer) Start() error { + span := s.tracer.Start("start") + defer span.End() + s.logger.Info("LibP2P server running", "addr", common.AddrInfoToString(s.AddrInfo())) if setupErr := s.setupIdentity(); setupErr != nil { @@ -602,6 +550,16 @@ func (s *DefaultServer) Peers() []*PeerConnInfo { return peers } +// HasFreeConnectionSlot checks if there are free connection slots in the specified direction [Thread safe] +func (s *DefaultServer) HasFreeConnectionSlot(direction network.Direction) bool { + return s.connectionCounts.HasFreeConnectionSlot(direction) +} + +// GetTracer returns the tracer instance +func (s *DefaultServer) GetTracer() telemetry.Tracer { + return s.tracer +} + // hasPeer checks if the peer is present in the peers list [Thread safe] func (s *DefaultServer) HasPeer(peerID peer.ID) bool { s.peersLock.RLock() diff --git a/network/server_identity.go b/network/server_identity.go index fc4f2145d7..f4a3f80511 100644 --- a/network/server_identity.go +++ b/network/server_identity.go @@ -110,6 +110,9 @@ func (s *DefaultServer) setupIdentity() error { return fmt.Errorf("identity service already initialized") } + span := s.tracer.Start("setupIdentity") + defer span.End() + // Create an instance of the identity service s.identity = identity.NewIdentityService( s, diff --git a/network/testing/testing.go b/network/testing/testing.go index 1856b7d78d..7d01a2ba7a 100644 --- a/network/testing/testing.go +++ b/network/testing/testing.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/dogechain-lab/dogechain/helper/telemetry" "github.com/dogechain-lab/dogechain/network/client" "github.com/dogechain-lab/dogechain/network/event" "github.com/dogechain-lab/dogechain/network/proto" @@ -42,6 +43,9 @@ type MockNetworkingServer struct { isBootnodeFn isBootnodeDelegate isStaticPeerFn isStaticPeerDelegate hasPeerFn hasPeerDelegate + + // tracer + getTraceFn getTraceDelegate } func NewMockNetworkingServer() *MockNetworkingServer { @@ -86,6 +90,9 @@ type isBootnodeDelegate func(peer.ID) bool type isStaticPeerDelegate func(peer.ID) bool type hasPeerDelegate func(peer.ID) bool +// tracer +type getTraceDelegate func() telemetry.Tracer + func (m *MockNetworkingServer) NewIdentityClient(peerID peer.ID) (client.IdentityClient, error) { if m.newIdentityClientFn != nil { return m.newIdentityClientFn(peerID) @@ -278,6 +285,18 @@ func (m *MockNetworkingServer) HookHasPeer(fn hasPeerDelegate) { m.hasPeerFn = fn } +func (m *MockNetworkingServer) HookGetTracer(fn getTraceDelegate) { + m.getTraceFn = fn +} + +func (m *MockNetworkingServer) GetTracer() telemetry.Tracer { + if m.getTraceFn != nil { + return m.getTraceFn() + } + + return nil +} + // MockIdentityClient mocks an identity client (other peer in the communication) type MockIdentityClient struct { // Hooks that the test can set diff --git a/server/config.go b/server/config.go index c564a0f42a..cc7297f698 100644 --- a/server/config.go +++ b/server/config.go @@ -66,6 +66,8 @@ type LeveldbOptions struct { type Telemetry struct { PrometheusAddr *net.TCPAddr EnableIOMetrics bool + EnableJaeger bool + JaegerURL string } // JSONRPC holds the config details for the JSON-RPC server diff --git a/server/server.go b/server/server.go index 1aaee64d21..416be64b23 100644 --- a/server/server.go +++ b/server/server.go @@ -21,6 +21,7 @@ import ( "github.com/dogechain-lab/dogechain/helper/common" "github.com/dogechain-lab/dogechain/helper/kvdb" "github.com/dogechain-lab/dogechain/helper/progress" + "github.com/dogechain-lab/dogechain/helper/telemetry" "github.com/dogechain-lab/dogechain/jsonrpc" "github.com/dogechain-lab/dogechain/network" "github.com/dogechain-lab/dogechain/secrets" @@ -40,8 +41,13 @@ import ( // Minimal is the central manager of the blockchain client type Server struct { - logger hclog.Logger - config *Config + ctx context.Context // the context for the server + + logger hclog.Logger + tracerProvider telemetry.TracerProvider // the tracer for telemetry + + config *Config + state state.State stateStorage itrie.Storage @@ -163,6 +169,7 @@ func NewServer(config *Config) (*Server, error) { m := &Server{ logger: logger, + ctx: context.Background(), config: config, chain: config.Chain, grpcServer: grpc.NewServer( @@ -186,6 +193,22 @@ func NewServer(config *Config) (*Server, error) { m.serverMetrics = metricProvider("dogechain", config.Chain.Name, false, false) } + if config.Telemetry.EnableJaeger { + var err error + + m.tracerProvider, err = telemetry.NewTracerProvider( + m.ctx, + config.Telemetry.JaegerURL, + loggerDomainName, + ) + + if err != nil { + return nil, fmt.Errorf("failed to set up the tracer: %w", err) + } + } else { + m.tracerProvider = telemetry.NewNilTracerProvider(m.ctx) + } + // Set up the secrets manager if err := m.setupSecretsManager(); err != nil { return nil, fmt.Errorf("failed to set up the secrets manager: %w", err) @@ -199,7 +222,9 @@ func NewServer(config *Config) (*Server, error) { netConfig.SecretsManager = m.secretsManager netConfig.Metrics = m.serverMetrics.network - network, err := network.NewServer(logger, netConfig) + trace := m.tracerProvider.NewTracer("network") + + network, err := network.NewServer(logger, trace, netConfig) if err != nil { return nil, err } @@ -652,6 +677,12 @@ func (s *Server) Close() { s.logger.Error("Prometheus server shutdown error", err) } } + + if s.tracerProvider != nil { + if err := s.tracerProvider.Shutdown(context.Background()); err != nil { + s.logger.Error("Tracer provider shutdown error", err) + } + } } // Entry is a backend configuration entry From d5f041abef6b376fc83711296b16a821c8290630 Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Tue, 7 Mar 2023 16:49:24 +0800 Subject: [PATCH 5/9] network telemetry --- helper/telemetry/interface.go | 21 ++++++++---- helper/telemetry/jaeger.go | 42 ++++++++++++++++++++---- helper/telemetry/nil_provider.go | 40 +++++++++-------------- network/discovery/discovery.go | 19 ++++++++++- network/e2e_testing.go | 2 +- network/event/peer_event.go | 9 ++++- network/identity/identity.go | 38 ++++++++++++++++++---- network/peer_connect.go | 20 ++++++++++-- network/server.go | 56 +++++++++++++++++++++++++++----- network/server_discovery.go | 3 ++ network/server_identity.go | 2 +- network/server_test.go | 2 +- server/server.go | 2 +- 13 files changed, 196 insertions(+), 60 deletions(-) diff --git a/helper/telemetry/interface.go b/helper/telemetry/interface.go index d0d07ff257..308509e8f8 100644 --- a/helper/telemetry/interface.go +++ b/helper/telemetry/interface.go @@ -4,6 +4,7 @@ import ( "context" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) type contextLabel string @@ -27,26 +28,29 @@ const ( ) type Span interface { - // SetAttribute sets an attribute (base type) + // SetAttribute set attribute (base type) SetAttribute(label string, value interface{}) - // SetAttributes sets attributes + // SetAttributes set attributes SetAttributes(attributes map[string]interface{}) // AddEvent adds an event AddEvent(name string, attributes map[string]interface{}) - // SetStatus sets the status + // SetStatus set status SetStatus(code Code, info string) - // SetError sets the error + // SetError set error SetError(err error) // End ends the span End() - // context returns the span context - context() context.Context + // SpanContext returns the span context + SpanContext() trace.SpanContext + + // Context returns the context.Context (span warrapped) + Context() context.Context } // Tracer provides a tracer @@ -55,7 +59,10 @@ type Tracer interface { Start(name string) Span // StartWithParent starts a new span with a parent - StartWithParent(parent Span, name string) Span + StartWithParent(parent trace.SpanContext, name string) Span + + // StartWithParentFromContext starts a new span with a parent from context + StartWithParentFromContext(ctx context.Context, name string) Span } type TracerProvider interface { diff --git a/helper/telemetry/jaeger.go b/helper/telemetry/jaeger.go index 83d7bd4b66..1f2d805b49 100644 --- a/helper/telemetry/jaeger.go +++ b/helper/telemetry/jaeger.go @@ -114,9 +114,16 @@ func (s *jaegerSpan) End() { s.span.End() } -// context returns the span context -func (s *jaegerSpan) context() context.Context { - return s.ctx +// SpanContext returns the span context +func (s *jaegerSpan) SpanContext() trace.SpanContext { + return s.span.SpanContext() +} + +// Context returns the span context +func (s *jaegerSpan) Context() context.Context { + ctx := trace.ContextWithSpanContext(s.ctx, s.SpanContext()) + + return ctx } // jaegerTracer @@ -139,12 +146,35 @@ func (t *jaegerTracer) Start(name string) Span { } // StartWithParent starts a new span with a parent -func (t *jaegerTracer) StartWithParent(parent Span, name string) Span { - ctx, span := t.tracer.Start(parent.context(), name) +func (t *jaegerTracer) StartWithParent(parent trace.SpanContext, name string) Span { + // create a new span context config + spanContextConfig := trace.SpanContextConfig{ + TraceID: parent.TraceID(), + SpanID: parent.SpanID(), + TraceFlags: parent.TraceFlags(), + Remote: parent.IsRemote(), + } + spanContext := trace.NewSpanContext(spanContextConfig) + + // create a new context with the parent span context + ctx := trace.ContextWithSpanContext(t.context, spanContext) + + // create a new span + childContext, span := t.tracer.Start(ctx, name) return &jaegerSpan{ span: span, - ctx: ctx, + ctx: childContext, + } +} + +// StartWithParentFromContext starts a new span with a parent from the context +func (t *jaegerTracer) StartWithParentFromContext(ctx context.Context, name string) Span { + childContext, span := t.tracer.Start(ctx, name) + + return &jaegerSpan{ + span: span, + ctx: childContext, } } diff --git a/helper/telemetry/nil_provider.go b/helper/telemetry/nil_provider.go index 5b64634957..81b6dd2f6c 100644 --- a/helper/telemetry/nil_provider.go +++ b/helper/telemetry/nil_provider.go @@ -2,15 +2,12 @@ package telemetry import ( "context" -) -const ( - NilContextName contextValue = "nil" + "go.opentelemetry.io/otel/trace" ) // nilSpan type nilSpan struct { - ctx context.Context } // SetAttribute sets an attribute @@ -35,42 +32,39 @@ func (s *nilSpan) SetError(err error) { func (s *nilSpan) End() { } -// context returns the span context -func (s *nilSpan) context() context.Context { - return s.ctx +func (s *nilSpan) SpanContext() trace.SpanContext { + return trace.SpanContext{} +} + +func (s *nilSpan) Context() context.Context { + return context.Background() } // nilTracer type nilTracer struct { - // context - context context.Context } // Start starts a new span func (t *nilTracer) Start(name string) Span { - return &nilSpan{ - ctx: t.context, - } + return &nilSpan{} } // StartWithParent starts a new span with a parent -func (t *nilTracer) StartWithParent(parent Span, name string) Span { - return &nilSpan{ - ctx: t.context, - } +func (t *nilTracer) StartWithParent(parent trace.SpanContext, name string) Span { + return &nilSpan{} +} + +func (t *nilTracer) StartWithParentFromContext(ctx context.Context, name string) Span { + return &nilSpan{} } // nilTracerProvider type nilTracerProvider struct { - // context - context context.Context } // NewTracer creates a new tracer func (p *nilTracerProvider) NewTracer(namespace string) Tracer { - return &nilTracer{ - context: p.context, - } + return &nilTracer{} } // Shutdown shuts down the tracer provider @@ -80,7 +74,5 @@ func (p *nilTracerProvider) Shutdown(ctx context.Context) error { // NewNilTracerProvider creates a new trace provider func NewNilTracerProvider(ctx context.Context) TracerProvider { - return &nilTracerProvider{ - context: context.WithValue(ctx, ContextNamespace, NilContextName), - } + return &nilTracerProvider{} } diff --git a/network/discovery/discovery.go b/network/discovery/discovery.go index 70e367c6d4..c3b868d77a 100644 --- a/network/discovery/discovery.go +++ b/network/discovery/discovery.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/dogechain-lab/dogechain/helper/telemetry" "github.com/dogechain-lab/dogechain/network/client" "github.com/dogechain-lab/dogechain/network/common" "github.com/dogechain-lab/dogechain/network/event" @@ -86,6 +87,9 @@ type networkingServer interface { // PeerCount connection peer number PeerCount() int64 + + // GetTracer returns the tracer instance + GetTracer() telemetry.Tracer } // peerAddreStore is a struct that contains the peer address information @@ -227,13 +231,21 @@ func (d *DiscoveryService) HandleNetworkEvent(peerEvent *event.PeerEvent) { // if bootnode disconnects and shutdown, can use this reconnect to network peerID := peerEvent.PeerID + // create tracer span + span := d.baseServer.GetTracer().StartWithParent( + peerEvent.SpanContext, + "discovery.HandleNetworkEvent", + ) + defer span.End() + // identity service trigger PeerDialCompleted event switch peerEvent.Type { case event.PeerDialCompleted: // Add peer to the routing table and to our local peer table - _, err := d.routingTable.TryAddPeer(peerID, false, true) + exist, err := d.routingTable.TryAddPeer(peerID, false, true) if err != nil { d.logger.Error("failed to add peer to routing table", "err", err) + span.SetError(err) return } @@ -245,6 +257,11 @@ func (d *DiscoveryService) HandleNetworkEvent(peerEvent *event.PeerEvent) { // update last use time d.routingTable.UpdateLastUsefulAt(peerID, time.Now()) + + span.AddEvent("peer_added_to_routing_table", map[string]interface{}{ + "existed": exist, + "peerInfo": peerInfo, + }) } } diff --git a/network/e2e_testing.go b/network/e2e_testing.go index 16965fd69d..01870bb4a5 100644 --- a/network/e2e_testing.go +++ b/network/e2e_testing.go @@ -337,7 +337,7 @@ func CreateServer(params *CreateServerParams) (*DefaultServer, error) { tracerProvider := telemetry.NewNilTracerProvider(context.Background()) - server, err := newServer(params.Logger, tracerProvider.NewTracer("test"), cfg) + server, err := newServer(context.Background(), params.Logger, tracerProvider.NewTracer("test"), cfg) if err != nil { return nil, err } diff --git a/network/event/peer_event.go b/network/event/peer_event.go index 8be3e1d7cf..1fe2af6cd5 100644 --- a/network/event/peer_event.go +++ b/network/event/peer_event.go @@ -1,6 +1,10 @@ package event -import "github.com/libp2p/go-libp2p/core/peer" +import ( + "github.com/libp2p/go-libp2p/core/peer" + + "go.opentelemetry.io/otel/trace" +) type PeerEventType uint @@ -27,6 +31,9 @@ type PeerEvent struct { // Type is the type of the event Type PeerEventType + + // trace context + SpanContext trace.SpanContext } func (s PeerEventType) String() string { diff --git a/network/identity/identity.go b/network/identity/identity.go index 20f808f0f3..c968ed5237 100644 --- a/network/identity/identity.go +++ b/network/identity/identity.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/dogechain-lab/dogechain/helper/telemetry" "github.com/dogechain-lab/dogechain/network/client" "github.com/dogechain-lab/dogechain/network/event" "github.com/dogechain-lab/dogechain/network/proto" @@ -50,6 +51,9 @@ type networkingServer interface { // HasFreeConnectionSlot checks if there are available outbound connection slots [Thread safe] HasFreeConnectionSlot(direction network.Direction) bool + + // GetTracer returns the base networking server's tracer + GetTracer() telemetry.Tracer } // IdentityService is a networking service used to handle peer handshaking. @@ -86,6 +90,11 @@ func NewIdentityService( func (i *IdentityService) GetNotifyBundle() *network.NotifyBundle { return &network.NotifyBundle{ ConnectedF: func(net network.Network, conn network.Conn) { + tracer := i.baseServer.GetTracer() + + span := tracer.Start("identity.ConnectedF") + defer span.End() + peerID := conn.RemotePeer() direction := conn.Stat().Direction @@ -93,6 +102,8 @@ func (i *IdentityService) GetNotifyBundle() *network.NotifyBundle { if i.HasPendingStatus(peerID) { // handshake has already started + span.SetStatus(telemetry.Unset, "already pending") + return } @@ -110,18 +121,34 @@ func (i *IdentityService) GetNotifyBundle() *network.NotifyBundle { if !i.baseServer.HasFreeConnectionSlot(direction) { i.disconnectFromPeer(peerID, ErrNoAvailableSlots.Error()) + span.SetStatus(telemetry.Error, ErrNoAvailableSlots.Error()) return } i.addPendingStatus(peerID, direction) + span.AddEvent("pending status added", map[string]interface{}{ + "peer": peerID, + "direction": direction, + }) + + spanCtx := span.SpanContext() go func() { + span := tracer.StartWithParent(spanCtx, "identity.handleConnected") + defer span.End() + connectEvent := &event.PeerEvent{ - PeerID: peerID, - Type: event.PeerDialCompleted, + PeerID: peerID, + Type: event.PeerDialCompleted, + SpanContext: span.SpanContext(), } + span.SetAttributes(map[string]interface{}{ + "peer": peerID, + "direction": direction, + }) + if err := i.handleConnected(peerID, conn.Stat().Direction); err != nil { i.logger.Debug("identity check failed, disconnect peer", "peer", peerID) @@ -129,16 +156,15 @@ func (i *IdentityService) GetNotifyBundle() *network.NotifyBundle { i.disconnectFromPeer(peerID, err.Error()) i.logger.Debug("send PeerFailedToConnect event", "peer", peerID) + span.SetError(err) + connectEvent.Type = event.PeerFailedToConnect } i.removePendingStatus(peerID, direction) // Emit an adequate event - i.baseServer.EmitEvent(&event.PeerEvent{ - PeerID: connectEvent.PeerID, - Type: connectEvent.Type, - }) + i.baseServer.EmitEvent(connectEvent) }() }, } diff --git a/network/peer_connect.go b/network/peer_connect.go index aa717a952e..798a413bf6 100644 --- a/network/peer_connect.go +++ b/network/peer_connect.go @@ -1,6 +1,9 @@ package network import ( + "context" + + "github.com/dogechain-lab/dogechain/helper/telemetry" "github.com/dogechain-lab/dogechain/network/client" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -51,12 +54,23 @@ func (pci *PeerConnInfo) addProtocolClient(protocol string, stream client.GrpcCl } // cleanProtocolStreams clean and closes all protocol stream -func (pci *PeerConnInfo) cleanProtocolStreams() []error { +func (pci *PeerConnInfo) cleanProtocolStreams(ctx context.Context, trace telemetry.Tracer) []error { errs := []error{} + span := trace.StartWithParentFromContext(ctx, "cleanProtocolStreams") - for _, clt := range pci.protocolClient { + for protocolName, clt := range pci.protocolClient { if clt != nil { - errs = append(errs, clt.Close()) + err := clt.Close() + if err != nil { + span.AddEvent( + "close_error", + map[string]interface{}{ + "protocol": protocolName, + "error": err.Error(), + }) + } + + errs = append(errs, err) } } diff --git a/network/server.go b/network/server.go index 90c96b6465..24d1070777 100644 --- a/network/server.go +++ b/network/server.go @@ -65,6 +65,9 @@ var ( ) type DefaultServer struct { + // TODO: switch to context.Context implementation + // ctx context.Context // the context for the networking server + logger hclog.Logger // the logger tracer telemetry.Tracer // the tracer for telemetry @@ -106,14 +109,19 @@ type DefaultServer struct { } // NewServer returns a new instance of the networking server -func NewServer(logger hclog.Logger, tracer telemetry.Tracer, config *Config) (Server, error) { - return newServer(logger, tracer, config) +func NewServer(ctx context.Context, logger hclog.Logger, tracer telemetry.Tracer, config *Config) (Server, error) { + return newServer(ctx, logger, tracer, config) } -func newServer(logger hclog.Logger, tracer telemetry.Tracer, config *Config) (*DefaultServer, error) { +func newServer( + ctx context.Context, + logger hclog.Logger, + tracer telemetry.Tracer, + config *Config, +) (*DefaultServer, error) { logger = logger.Named("network") - span := tracer.Start("init") + span := tracer.StartWithParentFromContext(ctx, "network.newServer") defer span.End() key, err := setupLibp2pKey(config.SecretsManager) @@ -266,7 +274,7 @@ func setupLibp2pKey(secretsManager secrets.SecretsManager) (crypto.PrivKey, erro // Start starts the networking services func (s *DefaultServer) Start() error { - span := s.tracer.Start("start") + span := s.tracer.Start("network.Start") defer span.End() s.logger.Info("LibP2P server running", "addr", common.AddrInfoToString(s.AddrInfo())) @@ -303,8 +311,11 @@ func (s *DefaultServer) Start() error { s.host.Network().Notify(&network.NotifyBundle{ DisconnectedF: func(net network.Network, conn network.Conn) { s.logger.Info("peer disconnected, remove peer connect info", "id", conn.RemotePeer()) + span := s.tracer.Start("network.DisconnectedF") + defer span.End() + // Update the local connection metrics - s.removePeerConnect(conn.RemotePeer(), conn.Stat().Direction) + s.removePeerConnect(context.Background(), conn.RemotePeer(), conn.Stat().Direction) }, }) @@ -511,13 +522,22 @@ func (s *DefaultServer) runDial() { } func (s *DefaultServer) Connect(peerInfo peer.AddrInfo) error { + span := s.tracer.Start("network.Connect") + defer span.End() + if !s.HasPeer(peerInfo.ID) && s.selfID != peerInfo.ID { + span.AddEvent("dialing_peer", map[string]interface{}{ + "peer": peerInfo.ID.String(), + "addr": peerInfo.Addrs, + }) + // the connection process is async because it involves connection (here) + // the handshake done in the identity service. ctx := network.WithDialPeerTimeout(context.Background(), DefaultDialTimeout) if err := s.host.Connect(ctx, peerInfo); err != nil { s.logger.Debug("failed to dial", "addr", peerInfo.String(), "err", err.Error()) + span.SetError(err) s.emitEvent(peerInfo.ID, peerEvent.PeerFailedToConnect) @@ -588,7 +608,18 @@ func (s *DefaultServer) GetProtocols(peerID peer.ID) ([]string, error) { // removePeerConnect removes a peer from the networking server's peer list, // and updates relevant counters and metrics. It only called from the // disconnection callback of the libp2p network bundle (when the connection is closed) -func (s *DefaultServer) removePeerConnect(peerID peer.ID, direction network.Direction) { +func (s *DefaultServer) removePeerConnect(ctx context.Context, peerID peer.ID, direction network.Direction) { + span := func() telemetry.Span { + const spanName = "network.remove_peer_connect" + + if ctx == nil { + return s.tracer.Start(spanName) + } else { + return s.tracer.StartWithParentFromContext(ctx, spanName) + } + }() + defer span.End() + s.peersLock.Lock() defer s.peersLock.Unlock() @@ -601,6 +632,10 @@ func (s *DefaultServer) removePeerConnect(peerID peer.ID, direction network.Dire s.logger.Warn( fmt.Sprintf("Attempted removing missing peer info %s", peerID), ) + span.AddEvent("remove_peer_store", map[string]interface{}{ + "peer": peerID.String(), + "dorection": direction.String(), + }) // NOTO: Remove the peer from the peer store, // if not removed, host.Network().Notify never triggered @@ -619,7 +654,7 @@ func (s *DefaultServer) removePeerConnect(peerID peer.ID, direction network.Dire // No more connections to the peer, remove it from the peers map delete(s.peers, peerID) - if errs := connectionInfo.cleanProtocolStreams(); len(errs) > 0 { + if errs := connectionInfo.cleanProtocolStreams(span.Context(), s.tracer); len(errs) > 0 { for _, err := range errs { if err != nil { s.logger.Error("close protocol streams failed", "err", err) @@ -627,6 +662,11 @@ func (s *DefaultServer) removePeerConnect(peerID peer.ID, direction network.Dire } } + span.AddEvent("remove_peer_store", map[string]interface{}{ + "peer": peerID.String(), + "dorection": direction.String(), + }) + // NOTO: Remove the peer from the peer store, // if not removed, host.Network().Notify never triggered s.RemoveFromPeerStore(peerID) diff --git a/network/server_discovery.go b/network/server_discovery.go index 37606a2314..d7a954c66e 100644 --- a/network/server_discovery.go +++ b/network/server_discovery.go @@ -86,6 +86,9 @@ func (s *DefaultServer) AddToPeerStore(peerInfo *peer.AddrInfo) { // RemoveFromPeerStore removes peer information from the node's peer store, ignoring static nodes and bootnodes func (s *DefaultServer) RemoveFromPeerStore(peerID peer.ID) { + span := s.tracer.Start("network.RemoveFromPeerStore") + defer span.End() + s.host.Peerstore().RemovePeer(peerID) s.host.Peerstore().ClearAddrs(peerID) } diff --git a/network/server_identity.go b/network/server_identity.go index f4a3f80511..5b33eafd9f 100644 --- a/network/server_identity.go +++ b/network/server_identity.go @@ -110,7 +110,7 @@ func (s *DefaultServer) setupIdentity() error { return fmt.Errorf("identity service already initialized") } - span := s.tracer.Start("setupIdentity") + span := s.tracer.Start("network.setupIdentity") defer span.End() // Create an instance of the identity service diff --git a/network/server_test.go b/network/server_test.go index 3d4b422913..28d29d36be 100644 --- a/network/server_test.go +++ b/network/server_test.go @@ -1072,7 +1072,7 @@ func TestPeerAdditionDeletion(t *testing.T) { peerInfo, ok := server.peers[randomPeers[i].peerID] if ok { for direction := range peerInfo.connDirections { - server.removePeerConnect(randomPeers[i].peerID, direction) + server.removePeerConnect(nil, randomPeers[i].peerID, direction) } } diff --git a/server/server.go b/server/server.go index 416be64b23..abc9a5b712 100644 --- a/server/server.go +++ b/server/server.go @@ -224,7 +224,7 @@ func NewServer(config *Config) (*Server, error) { trace := m.tracerProvider.NewTracer("network") - network, err := network.NewServer(logger, trace, netConfig) + network, err := network.NewServer(m.ctx, logger, trace, netConfig) if err != nil { return nil, err } From 511a9542cc6933620ef0c988c1235799c3614c3d Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Tue, 7 Mar 2023 18:29:16 +0800 Subject: [PATCH 6/9] add more tracer --- helper/telemetry/interface.go | 3 +++ helper/telemetry/jaeger.go | 13 +++++++-- helper/telemetry/nil_provider.go | 9 ++++++- network/discovery/discovery.go | 5 +++- network/identity/identity.go | 11 +++++--- network/keep_available.go | 45 +++++++++++++++++++++++++++----- network/server.go | 45 +++++++++++++++++++++++++------- network/server_discovery.go | 3 ++- network/server_identity.go | 10 ++++--- network/server_test.go | 6 ++--- network/testing/testing.go | 6 ++--- server/server.go | 4 --- 12 files changed, 122 insertions(+), 38 deletions(-) diff --git a/helper/telemetry/interface.go b/helper/telemetry/interface.go index 308509e8f8..cec8927a39 100644 --- a/helper/telemetry/interface.go +++ b/helper/telemetry/interface.go @@ -63,6 +63,9 @@ type Tracer interface { // StartWithParentFromContext starts a new span with a parent from context StartWithParentFromContext(ctx context.Context, name string) Span + + // GetTraceProvider returns the trace provider + GetTraceProvider() TracerProvider } type TracerProvider interface { diff --git a/helper/telemetry/jaeger.go b/helper/telemetry/jaeger.go index 1f2d805b49..86fab4fc43 100644 --- a/helper/telemetry/jaeger.go +++ b/helper/telemetry/jaeger.go @@ -133,6 +133,9 @@ type jaegerTracer struct { // tracer tracer trace.Tracer + + // provider + provider *jaegerTracerProvider } // Start starts a new span @@ -178,6 +181,11 @@ func (t *jaegerTracer) StartWithParentFromContext(ctx context.Context, name stri } } +// GetTracerProvider returns the tracer provider +func (t *jaegerTracer) GetTraceProvider() TracerProvider { + return t.provider +} + // jaegerTracerProvider type jaegerTracerProvider struct { // context @@ -190,8 +198,9 @@ type jaegerTracerProvider struct { // NewTracer creates a new tracer func (p *jaegerTracerProvider) NewTracer(namespace string) Tracer { return &jaegerTracer{ - context: p.context, - tracer: p.provider.Tracer(namespace), + context: p.context, + tracer: p.provider.Tracer(namespace), + provider: p, } } diff --git a/helper/telemetry/nil_provider.go b/helper/telemetry/nil_provider.go index 81b6dd2f6c..8247afba92 100644 --- a/helper/telemetry/nil_provider.go +++ b/helper/telemetry/nil_provider.go @@ -42,6 +42,7 @@ func (s *nilSpan) Context() context.Context { // nilTracer type nilTracer struct { + provider *nilTracerProvider } // Start starts a new span @@ -58,13 +59,19 @@ func (t *nilTracer) StartWithParentFromContext(ctx context.Context, name string) return &nilSpan{} } +func (t *nilTracer) GetTraceProvider() TracerProvider { + return t.provider +} + // nilTracerProvider type nilTracerProvider struct { } // NewTracer creates a new tracer func (p *nilTracerProvider) NewTracer(namespace string) Tracer { - return &nilTracer{} + return &nilTracer{ + provider: p, + } } // Shutdown shuts down the tracer provider diff --git a/network/discovery/discovery.go b/network/discovery/discovery.go index c3b868d77a..925f688c6c 100644 --- a/network/discovery/discovery.go +++ b/network/discovery/discovery.go @@ -167,6 +167,7 @@ type DiscoveryService struct { baseServer networkingServer // The interface towards the base networking server logger hclog.Logger // The DiscoveryService logger + tracer telemetry.Tracer // tracer for the IdentityService routingTable *kb.RoutingTable // Kademlia 'k-bucket' routing table that contains connected nodes info peerAddress *peerAddreStore // stores the peer address information @@ -190,6 +191,7 @@ func NewDiscoveryService( return &DiscoveryService{ baseServer: server, logger: logger.Named("discovery"), + tracer: server.GetTracer().GetTraceProvider().NewTracer("discovery"), routingTable: routingTable, peerAddress: newPeerAddreStore(), ignoreCIDR: ignoreCIDR, @@ -232,7 +234,7 @@ func (d *DiscoveryService) HandleNetworkEvent(peerEvent *event.PeerEvent) { peerID := peerEvent.PeerID // create tracer span - span := d.baseServer.GetTracer().StartWithParent( + span := d.tracer.StartWithParent( peerEvent.SpanContext, "discovery.HandleNetworkEvent", ) @@ -246,6 +248,7 @@ func (d *DiscoveryService) HandleNetworkEvent(peerEvent *event.PeerEvent) { if err != nil { d.logger.Error("failed to add peer to routing table", "err", err) span.SetError(err) + span.SetStatus(telemetry.Error, "failed to add peer to routing table") return } diff --git a/network/identity/identity.go b/network/identity/identity.go index c968ed5237..cb29e21153 100644 --- a/network/identity/identity.go +++ b/network/identity/identity.go @@ -45,7 +45,7 @@ type networkingServer interface { UpdatePendingConnCount(delta int64, direction network.Direction) // EmitEvent emits the specified peer event on the base networking server - EmitEvent(event *event.PeerEvent) + EmitEvent(ctx context.Context, event *event.PeerEvent) // CONNECTION INFORMATION // @@ -64,7 +64,9 @@ type IdentityService struct { pendingPeerConnections map[peer.ID]struct{} // Map that keeps track of the pending status of peers; peerID -> bool pendingCountMux sync.RWMutex // Mutex for the pendingPeerConnections map - logger hclog.Logger // The IdentityService logger + logger hclog.Logger // The IdentityService logger + tracer telemetry.Tracer // tracer for the IdentityService + baseServer networkingServer // The interface towards the base networking server chainID int64 // The chain ID of the network @@ -80,6 +82,7 @@ func NewIdentityService( ) *IdentityService { return &IdentityService{ logger: logger.Named("identity"), + tracer: server.GetTracer().GetTraceProvider().NewTracer("identity"), baseServer: server, chainID: chainID, hostID: hostID, @@ -156,7 +159,9 @@ func (i *IdentityService) GetNotifyBundle() *network.NotifyBundle { i.disconnectFromPeer(peerID, err.Error()) i.logger.Debug("send PeerFailedToConnect event", "peer", peerID) + span.SetError(err) + span.SetStatus(telemetry.Error, "identity check failed") connectEvent.Type = event.PeerFailedToConnect } @@ -164,7 +169,7 @@ func (i *IdentityService) GetNotifyBundle() *network.NotifyBundle { i.removePendingStatus(peerID, direction) // Emit an adequate event - i.baseServer.EmitEvent(connectEvent) + i.baseServer.EmitEvent(span.Context(), connectEvent) }() }, } diff --git a/network/keep_available.go b/network/keep_available.go index 1941200e7c..1035af5cd0 100644 --- a/network/keep_available.go +++ b/network/keep_available.go @@ -1,10 +1,12 @@ package network import ( + "context" "math/rand" "sync" "time" + "github.com/dogechain-lab/dogechain/helper/telemetry" "github.com/dogechain-lab/dogechain/network/common" "github.com/hashicorp/go-hclog" "github.com/libp2p/go-libp2p/core/peer" @@ -40,6 +42,9 @@ type keepAvailable struct { // logger logger hclog.Logger + // tracer for keep available + tracer telemetry.Tracer + // Keep available status status keepAvailableStatus @@ -77,9 +82,11 @@ func newKeepAvailable(server *DefaultServer) *keepAvailable { batchDialPeers := (int(server.connectionCounts.maxOutboundConnCount()) / 4) + 1 // 25% of max outbound peers pendingConnectMark := make(map[peer.ID]struct{}) logger := server.logger.Named("keep_available") + tracer := server.tracer.GetTraceProvider().NewTracer("keepAvailable") return &keepAvailable{ server: server, + tracer: tracer, logger: logger, status: kasSleeping, timer: time.NewTimer(DefaultBackgroundTaskSleep), @@ -125,6 +132,9 @@ func (ka *keepAvailable) fsm() { ka.closeWG.Add(1) defer ka.closeWG.Done() + span := ka.tracer.Start("keepAvailable.fsm") + defer span.End() + // **status change sequence** // kasWeakUp -> checkDiscoveryServiceReady() // if Discovery service not ready -> state set kasDiscoveryWaiting @@ -154,19 +164,19 @@ func (ka *keepAvailable) fsm() { switch ka.status { case kasWeakUp: // first check discovery service is ready - ka.checkDiscoveryServiceReady() + ka.checkDiscoveryServiceReady(span.Context()) case kasDiscoveryWaiting: ka.timer.Reset(waitDiscoverDuration) return case kasMarkConnectionPending: - ka.markPendingConnection() + ka.markPendingConnection(span.Context()) case kasMaxPeerSleeping: ka.timer.Reset(connectFullSleepDuration) return case kasRandomDialed: - ka.randomDial() + ka.randomDial(span.Context()) case kasDialSleeping: ka.timer.Reset(doubleSleepDuration) @@ -180,25 +190,37 @@ func (ka *keepAvailable) fsm() { } // checkDiscoveryServiceReady check the discovery service is ready -func (ka *keepAvailable) checkDiscoveryServiceReady() { +func (ka *keepAvailable) checkDiscoveryServiceReady(ctx context.Context) { + span := ka.tracer.Start("keepAvailable.checkDiscoveryServiceReady") + defer span.End() + if ka.server.discovery == nil { ka.status = kasDiscoveryWaiting + span.SetAttribute("discovery", "nil") + return } + span.SetAttribute("discovery", "not nil") + // next status is mark pending connection ka.status = kasMarkConnectionPending } // markPendingConnection mark the connection as pending -func (ka *keepAvailable) markPendingConnection() { +func (ka *keepAvailable) markPendingConnection(ctx context.Context) { + span := ka.tracer.StartWithParentFromContext(ctx, "keepAvailable.markPendingConnection") + defer span.End() + var waitingPeersDisconnect sync.WaitGroup disconnectFlag := false peers := ka.server.host.Network().Peers() + ka.logger.Debug("ready connections", "count", len(peers)) + span.SetAttribute("ready_connections", len(peers)) for _, peerID := range peers { if peerID == ka.selfID { @@ -228,6 +250,10 @@ func (ka *keepAvailable) markPendingConnection() { // disconnect peer, but peersLock is locked, so use goroutine waitingPeersDisconnect.Add(1) + span.AddEvent("disconnect_peer", map[string]interface{}{ + "peer": peerID, + }) + go func(peerID peer.ID) { defer waitingPeersDisconnect.Done() ka.server.DisconnectFromPeer(peerID, "bye") @@ -254,6 +280,8 @@ func (ka *keepAvailable) markPendingConnection() { ka.pendingConnectMark = copyMark + span.SetAttribute("pendingConnectMark", copyMark) + // next check max peer count if len(peers) >= ka.maxPeers { ka.status = kasMaxPeerSleeping @@ -273,7 +301,10 @@ func (ka *keepAvailable) markPendingConnection() { } // randomDial random dialed peer -func (ka *keepAvailable) randomDial() { +func (ka *keepAvailable) randomDial(ctx context.Context) { + span := ka.tracer.StartWithParentFromContext(ctx, "keepAvailable.randomDial") + defer span.End() + // defer set status to sleep, and wait group done defer func() { ka.status = kasSleeping @@ -309,7 +340,7 @@ func (ka *keepAvailable) randomDial() { peerInfo := ka.server.discovery.GetConfirmPeerInfo(randPeer) if peerInfo != nil { ka.logger.Debug("dialing random peer", "peer", peerInfo) - ka.server.addToDialQueue(peerInfo, common.PriorityRandomDial) + ka.server.addToDialQueue(span.Context(), peerInfo, common.PriorityRandomDial) isDial = true dialCount++ diff --git a/network/server.go b/network/server.go index 24d1070777..1620c54054 100644 --- a/network/server.go +++ b/network/server.go @@ -135,6 +135,7 @@ func newServer( listenAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", config.Addr.IP.String(), config.Addr.Port)) if err != nil { span.SetError(err) + span.SetStatus(telemetry.Error, "failed to create listen address") return nil, err } @@ -388,6 +389,8 @@ func (s *DefaultServer) keepAliveStaticPeerConnections() { allConnected = true + span := s.tracer.Start("network.keepAliveStaticPeerConnections") + s.staticnodes.rangeAddrs(func(add *peer.AddrInfo) bool { if s.host.Network().Connectedness(add.ID) == network.Connected { return true @@ -398,10 +401,12 @@ func (s *DefaultServer) keepAliveStaticPeerConnections() { } s.logger.Info("reconnect static peer", "addr", add.String()) - s.addToDialQueue(add, common.PriorityRequestedDial) + s.addToDialQueue(span.Context(), add, common.PriorityRequestedDial) return true }) + + span.End() } } @@ -538,8 +543,9 @@ func (s *DefaultServer) Connect(peerInfo peer.AddrInfo) error { if err := s.host.Connect(ctx, peerInfo); err != nil { s.logger.Debug("failed to dial", "addr", peerInfo.String(), "err", err.Error()) span.SetError(err) + span.SetStatus(telemetry.Error, "connect failed") - s.emitEvent(peerInfo.ID, peerEvent.PeerFailedToConnect) + s.emitEvent(span.Context(), peerInfo.ID, peerEvent.PeerFailedToConnect) return err } @@ -610,7 +616,7 @@ func (s *DefaultServer) GetProtocols(peerID peer.ID) ([]string, error) { // disconnection callback of the libp2p network bundle (when the connection is closed) func (s *DefaultServer) removePeerConnect(ctx context.Context, peerID peer.ID, direction network.Direction) { span := func() telemetry.Span { - const spanName = "network.remove_peer_connect" + const spanName = "network.removePeerConnect" if ctx == nil { return s.tracer.Start(spanName) @@ -672,7 +678,7 @@ func (s *DefaultServer) removePeerConnect(ctx context.Context, peerID peer.ID, d s.RemoveFromPeerStore(peerID) // Emit the event alerting listeners - s.emitEvent(peerID, peerEvent.PeerDisconnected) + s.emitEvent(span.Context(), peerID, peerEvent.PeerDisconnected) } s.metrics.SetTotalPeerCount( @@ -702,7 +708,14 @@ func (s *DefaultServer) ForgetPeer(peer peer.ID, reason string) { // DisconnectFromPeer disconnects the networking server from the specified peer func (s *DefaultServer) DisconnectFromPeer(peerID peer.ID, reason string) { + span := s.tracer.Start("network.DisconnectFromPeer") + defer span.End() + + span.SetAttribute("peer_id", peerID.String()) + if s.IsStaticPeer(peerID) && s.HasPeer(peerID) { + span.SetAttribute("ignore_static_peer", true) + return } @@ -714,6 +727,8 @@ func (s *DefaultServer) DisconnectFromPeer(peerID peer.ID, reason string) { // Close the peer connection if closeErr := s.host.Network().ClosePeer(peerID); closeErr != nil { s.logger.Error("unable to gracefully close peer connection", "err", closeErr) + span.SetError(closeErr) + span.SetStatus(telemetry.Error, "closer peer connection failed") } } @@ -726,6 +741,9 @@ var ( // JoinPeer attempts to add a new peer to the networking server func (s *DefaultServer) JoinPeer(rawPeerMultiaddr string, static bool) error { + span := s.tracer.Start("network.JoinPeer") + defer span.End() + // Parse the raw string to a MultiAddr format parsedMultiaddr, err := multiaddr.NewMultiaddr(rawPeerMultiaddr) if err != nil { @@ -749,7 +767,7 @@ func (s *DefaultServer) JoinPeer(rawPeerMultiaddr string, static bool) error { // Mark the peer as ripe for dialing (async) s.logger.Info("start join peer", "addr", peerInfo.String()) - s.addToDialQueue(peerInfo, common.PriorityRequestedDial) + s.addToDialQueue(span.Context(), peerInfo, common.PriorityRequestedDial) return nil } @@ -880,20 +898,27 @@ func (s *DefaultServer) AddrInfo() *peer.AddrInfo { } } -func (s *DefaultServer) addToDialQueue(addr *peer.AddrInfo, priority common.DialPriority) { +func (s *DefaultServer) addToDialQueue(ctx context.Context, addr *peer.AddrInfo, priority common.DialPriority) { + span := s.tracer.StartWithParentFromContext(ctx, "network.addToDialQueue") + defer span.End() + if s.selfID == addr.ID { return } s.dialQueue.AddTask(addr, priority) - s.emitEvent(addr.ID, peerEvent.PeerAddedToDialQueue) + s.emitEvent(span.Context(), addr.ID, peerEvent.PeerAddedToDialQueue) } -func (s *DefaultServer) emitEvent(peerID peer.ID, peerEventType peerEvent.PeerEventType) { +func (s *DefaultServer) emitEvent(ctx context.Context, peerID peer.ID, peerEventType peerEvent.PeerEventType) { + span := s.tracer.StartWithParentFromContext(ctx, "network.emitEvent") + defer span.End() + // POTENTIALLY BLOCKING if err := s.emitterPeerEvent.Emit(peerEvent.PeerEvent{ - PeerID: peerID, - Type: peerEventType, + PeerID: peerID, + Type: peerEventType, + SpanContext: span.SpanContext(), }); err != nil { s.logger.Info("failed to emit event", "peer", peerID, "type", peerEventType, "err", err) } diff --git a/network/server_discovery.go b/network/server_discovery.go index d7a954c66e..ee78fcfc35 100644 --- a/network/server_discovery.go +++ b/network/server_discovery.go @@ -154,7 +154,8 @@ func (s *DefaultServer) setupDiscovery() error { // check peer is not connected and has free outbound connections if s.connectionCounts.HasFreeOutboundConn() && !s.HasPeer(p) { info := s.host.Peerstore().PeerInfo(p) - s.addToDialQueue(&info, common.PriorityRandomDial) + // TODO: use DefaultServer.ctx replace context.Background() + s.addToDialQueue(context.Background(), &info, common.PriorityRandomDial) } } diff --git a/network/server_identity.go b/network/server_identity.go index 5b33eafd9f..178441d6d0 100644 --- a/network/server_identity.go +++ b/network/server_identity.go @@ -1,6 +1,7 @@ package network import ( + "context" "fmt" "math/big" @@ -32,6 +33,9 @@ func (s *DefaultServer) NewIdentityClient(peerID peer.ID) (client.IdentityClient // AddPeer adds a new peer to the networking server's peer list, // and updates relevant counters and metrics func (s *DefaultServer) AddPeer(id peer.ID, direction network.Direction) { + span := s.tracer.Start("network.AddPeer") + defer span.End() + s.logger.Info("Peer connected", "id", id.String()) // Update the peer connection info @@ -45,7 +49,7 @@ func (s *DefaultServer) AddPeer(id peer.ID, direction network.Direction) { // WARNING: THIS CALL IS POTENTIALLY BLOCKING // UNDER HEAVY LOAD. IT SHOULD BE SUBSTITUTED // WITH AN EVENT SYSTEM THAT ACTUALLY WORKS - s.emitEvent(id, peerEvent.PeerConnected) + s.emitEvent(span.Context(), id, peerEvent.PeerConnected) } // addPeerInfo updates the networking server's internal peer info table @@ -100,8 +104,8 @@ func (s *DefaultServer) UpdatePendingConnCount(delta int64, direction network.Di } // EmitEvent emits a specified event to the networking server's event bus -func (s *DefaultServer) EmitEvent(event *peerEvent.PeerEvent) { - s.emitEvent(event.PeerID, event.Type) +func (s *DefaultServer) EmitEvent(ctx context.Context, event *peerEvent.PeerEvent) { + s.emitEvent(ctx, event.PeerID, event.Type) } // setupIdentity sets up the identity service for the node diff --git a/network/server_test.go b/network/server_test.go index 28d29d36be..f816873b74 100644 --- a/network/server_test.go +++ b/network/server_test.go @@ -205,7 +205,7 @@ func TestPeerEvent_EmitAndSubscribe(t *testing.T) { t.Run("Serial event emit and read", func(t *testing.T) { for i := 0; i < count; i++ { id, event := getIDAndEventType(i) - go server.emitEvent(id, event) + go server.emitEvent(context.Background(), id, event) select { case <-ctx.Done(): @@ -223,7 +223,7 @@ func TestPeerEvent_EmitAndSubscribe(t *testing.T) { go func() { for i := 0; i < count; i++ { id, event := getIDAndEventType(i) - server.emitEvent(id, event) + server.emitEvent(context.Background(), id, event) } }() @@ -1072,7 +1072,7 @@ func TestPeerAdditionDeletion(t *testing.T) { peerInfo, ok := server.peers[randomPeers[i].peerID] if ok { for direction := range peerInfo.connDirections { - server.removePeerConnect(nil, randomPeers[i].peerID, direction) + server.removePeerConnect(context.Background(), randomPeers[i].peerID, direction) } } diff --git a/network/testing/testing.go b/network/testing/testing.go index 7d01a2ba7a..9892e52bf9 100644 --- a/network/testing/testing.go +++ b/network/testing/testing.go @@ -75,7 +75,7 @@ type connectDelegate func(addrInfo peer.AddrInfo) error type disconnectFromPeerDelegate func(peer.ID, string) type addPeerDelegate func(peer.ID, network.Direction) type updatePendingConnCountDelegate func(int64, network.Direction) -type emitEventDelegate func(*event.PeerEvent) +type emitEventDelegate func(context.Context, *event.PeerEvent) type hasFreeConnectionSlotDelegate func(network.Direction) bool // Required for Discovery @@ -147,9 +147,9 @@ func (m *MockNetworkingServer) HookUpdatePendingConnCount(fn updatePendingConnCo m.updatePendingConnCountFn = fn } -func (m *MockNetworkingServer) EmitEvent(event *event.PeerEvent) { +func (m *MockNetworkingServer) EmitEvent(ctx context.Context, event *event.PeerEvent) { if m.emitEventFn != nil { - m.emitEventFn(event) + m.emitEventFn(ctx, event) } } diff --git a/server/server.go b/server/server.go index abc9a5b712..b1f119cf67 100644 --- a/server/server.go +++ b/server/server.go @@ -35,7 +35,6 @@ import ( "github.com/hashicorp/go-hclog" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "go.opentelemetry.io/otel" "google.golang.org/grpc" ) @@ -159,9 +158,6 @@ func newLevelDBBuilder(logger hclog.Logger, config *Config, path string) kvdb.Le // NewServer creates a new Minimal server, using the passed in configuration func NewServer(config *Config) (*Server, error) { - _, span := otel.Tracer("server").Start(context.Background(), "NewServer") - defer span.End() - logger, err := newLoggerFromConfig(config) if err != nil { return nil, fmt.Errorf("could not setup new logger instance, %w", err) From 156c6df30571a7ca974fc001a083398b9f249d64 Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Wed, 8 Mar 2023 11:00:00 +0800 Subject: [PATCH 7/9] rename, add more Context --- helper/telemetry/interface.go | 12 ++++++++---- helper/telemetry/jaeger.go | 8 +++----- helper/telemetry/nil_provider.go | 5 ++--- network/discovery/discovery.go | 2 +- network/grpc/grpc.go | 19 +++++++++++-------- network/identity/identity.go | 2 +- network/keep_available.go | 6 +++--- network/peer_connect.go | 2 +- network/server.go | 16 ++++++++-------- network/server_discovery.go | 2 +- network/server_identity.go | 2 +- protocol/client_test.go | 4 ++-- protocol/service.go | 2 +- 13 files changed, 43 insertions(+), 39 deletions(-) diff --git a/helper/telemetry/interface.go b/helper/telemetry/interface.go index cec8927a39..179ec452e2 100644 --- a/helper/telemetry/interface.go +++ b/helper/telemetry/interface.go @@ -40,8 +40,12 @@ type Span interface { // SetStatus set status SetStatus(code Code, info string) - // SetError set error - SetError(err error) + // RecordError will record err as an exception span event for this span. An + // additional call to SetStatus is required if the Status of the Span should + // be set to Error, as this method does not change the Span status. If this + // span is not being recorded or err is nil then this method does nothing. + // (wrapping otel.Span.RecordError) + RecordError(err error) // End ends the span End() @@ -61,8 +65,8 @@ type Tracer interface { // StartWithParent starts a new span with a parent StartWithParent(parent trace.SpanContext, name string) Span - // StartWithParentFromContext starts a new span with a parent from context - StartWithParentFromContext(ctx context.Context, name string) Span + // StartWithContext starts a new span with a parent from context + StartWithContext(ctx context.Context, name string) Span // GetTraceProvider returns the trace provider GetTraceProvider() TracerProvider diff --git a/helper/telemetry/jaeger.go b/helper/telemetry/jaeger.go index 86fab4fc43..a95b558200 100644 --- a/helper/telemetry/jaeger.go +++ b/helper/telemetry/jaeger.go @@ -104,12 +104,10 @@ func (s *jaegerSpan) SetStatus(code Code, info string) { s.span.SetStatus(codes.Code(code), info) } -// SetError sets the error -func (s *jaegerSpan) SetError(err error) { +func (s *jaegerSpan) RecordError(err error) { s.span.RecordError(err) } -// End ends the span func (s *jaegerSpan) End() { s.span.End() } @@ -171,8 +169,8 @@ func (t *jaegerTracer) StartWithParent(parent trace.SpanContext, name string) Sp } } -// StartWithParentFromContext starts a new span with a parent from the context -func (t *jaegerTracer) StartWithParentFromContext(ctx context.Context, name string) Span { +// StartWithContext starts a new span with a parent from the context +func (t *jaegerTracer) StartWithContext(ctx context.Context, name string) Span { childContext, span := t.tracer.Start(ctx, name) return &jaegerSpan{ diff --git a/helper/telemetry/nil_provider.go b/helper/telemetry/nil_provider.go index 8247afba92..ac9ee6b222 100644 --- a/helper/telemetry/nil_provider.go +++ b/helper/telemetry/nil_provider.go @@ -24,8 +24,7 @@ func (s *nilSpan) AddEvent(name string, attributes map[string]interface{}) { func (s *nilSpan) SetStatus(code Code, info string) { } -// SetError sets the error -func (s *nilSpan) SetError(err error) { +func (s *nilSpan) RecordError(err error) { } // End ends the span @@ -55,7 +54,7 @@ func (t *nilTracer) StartWithParent(parent trace.SpanContext, name string) Span return &nilSpan{} } -func (t *nilTracer) StartWithParentFromContext(ctx context.Context, name string) Span { +func (t *nilTracer) StartWithContext(ctx context.Context, name string) Span { return &nilSpan{} } diff --git a/network/discovery/discovery.go b/network/discovery/discovery.go index 925f688c6c..dcd44b1157 100644 --- a/network/discovery/discovery.go +++ b/network/discovery/discovery.go @@ -247,7 +247,7 @@ func (d *DiscoveryService) HandleNetworkEvent(peerEvent *event.PeerEvent) { exist, err := d.routingTable.TryAddPeer(peerID, false, true) if err != nil { d.logger.Error("failed to add peer to routing table", "err", err) - span.SetError(err) + span.RecordError(err) span.SetStatus(telemetry.Error, "failed to add peer to routing table") return diff --git a/network/grpc/grpc.go b/network/grpc/grpc.go index a967f9244a..a00d831563 100644 --- a/network/grpc/grpc.go +++ b/network/grpc/grpc.go @@ -5,28 +5,31 @@ import ( "errors" "io" "net" - - "google.golang.org/grpc/credentials/insecure" + "sync" "github.com/dogechain-lab/dogechain/helper/common" - manet "github.com/multiformats/go-multiaddr/net" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + manet "github.com/multiformats/go-multiaddr/net" grpcPeer "google.golang.org/grpc/peer" ) type GrpcStream struct { - ctx context.Context - ctxCancel context.CancelFunc + ctx context.Context + ctxCancel context.CancelFunc + ctxCancelOnce sync.Once streamCh chan network.Stream grpcServer *grpc.Server } -func NewGrpcStream() *GrpcStream { - ctx, cancel := context.WithCancel(context.Background()) +func NewGrpcStream(ctx context.Context) *GrpcStream { + ctx, cancel := context.WithCancel(ctx) return &GrpcStream{ ctx: ctx, @@ -121,7 +124,7 @@ func (g *GrpcStream) Addr() net.Addr { } func (g *GrpcStream) Close() error { - g.ctxCancel() + g.ctxCancelOnce.Do(g.ctxCancel) return nil } diff --git a/network/identity/identity.go b/network/identity/identity.go index cb29e21153..e0cd446a76 100644 --- a/network/identity/identity.go +++ b/network/identity/identity.go @@ -160,7 +160,7 @@ func (i *IdentityService) GetNotifyBundle() *network.NotifyBundle { i.logger.Debug("send PeerFailedToConnect event", "peer", peerID) - span.SetError(err) + span.RecordError(err) span.SetStatus(telemetry.Error, "identity check failed") connectEvent.Type = event.PeerFailedToConnect diff --git a/network/keep_available.go b/network/keep_available.go index 1035af5cd0..d458f706da 100644 --- a/network/keep_available.go +++ b/network/keep_available.go @@ -191,7 +191,7 @@ func (ka *keepAvailable) fsm() { // checkDiscoveryServiceReady check the discovery service is ready func (ka *keepAvailable) checkDiscoveryServiceReady(ctx context.Context) { - span := ka.tracer.Start("keepAvailable.checkDiscoveryServiceReady") + span := ka.tracer.StartWithContext(ctx, "keepAvailable.checkDiscoveryServiceReady") defer span.End() if ka.server.discovery == nil { @@ -210,7 +210,7 @@ func (ka *keepAvailable) checkDiscoveryServiceReady(ctx context.Context) { // markPendingConnection mark the connection as pending func (ka *keepAvailable) markPendingConnection(ctx context.Context) { - span := ka.tracer.StartWithParentFromContext(ctx, "keepAvailable.markPendingConnection") + span := ka.tracer.StartWithContext(ctx, "keepAvailable.markPendingConnection") defer span.End() var waitingPeersDisconnect sync.WaitGroup @@ -302,7 +302,7 @@ func (ka *keepAvailable) markPendingConnection(ctx context.Context) { // randomDial random dialed peer func (ka *keepAvailable) randomDial(ctx context.Context) { - span := ka.tracer.StartWithParentFromContext(ctx, "keepAvailable.randomDial") + span := ka.tracer.StartWithContext(ctx, "keepAvailable.randomDial") defer span.End() // defer set status to sleep, and wait group done diff --git a/network/peer_connect.go b/network/peer_connect.go index 798a413bf6..fc8f9e67b6 100644 --- a/network/peer_connect.go +++ b/network/peer_connect.go @@ -56,7 +56,7 @@ func (pci *PeerConnInfo) addProtocolClient(protocol string, stream client.GrpcCl // cleanProtocolStreams clean and closes all protocol stream func (pci *PeerConnInfo) cleanProtocolStreams(ctx context.Context, trace telemetry.Tracer) []error { errs := []error{} - span := trace.StartWithParentFromContext(ctx, "cleanProtocolStreams") + span := trace.StartWithContext(ctx, "PeerConnInfo.cleanProtocolStreams") for protocolName, clt := range pci.protocolClient { if clt != nil { diff --git a/network/server.go b/network/server.go index 1620c54054..36cc56a7e0 100644 --- a/network/server.go +++ b/network/server.go @@ -121,7 +121,7 @@ func newServer( ) (*DefaultServer, error) { logger = logger.Named("network") - span := tracer.StartWithParentFromContext(ctx, "network.newServer") + span := tracer.StartWithContext(ctx, "DefaultServer.newServer") defer span.End() key, err := setupLibp2pKey(config.SecretsManager) @@ -134,7 +134,7 @@ func newServer( listenAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", config.Addr.IP.String(), config.Addr.Port)) if err != nil { - span.SetError(err) + span.RecordError(err) span.SetStatus(telemetry.Error, "failed to create listen address") return nil, err @@ -542,7 +542,7 @@ func (s *DefaultServer) Connect(peerInfo peer.AddrInfo) error { if err := s.host.Connect(ctx, peerInfo); err != nil { s.logger.Debug("failed to dial", "addr", peerInfo.String(), "err", err.Error()) - span.SetError(err) + span.RecordError(err) span.SetStatus(telemetry.Error, "connect failed") s.emitEvent(span.Context(), peerInfo.ID, peerEvent.PeerFailedToConnect) @@ -616,12 +616,12 @@ func (s *DefaultServer) GetProtocols(peerID peer.ID) ([]string, error) { // disconnection callback of the libp2p network bundle (when the connection is closed) func (s *DefaultServer) removePeerConnect(ctx context.Context, peerID peer.ID, direction network.Direction) { span := func() telemetry.Span { - const spanName = "network.removePeerConnect" + const spanName = "DefaultServer.removePeerConnect" if ctx == nil { return s.tracer.Start(spanName) } else { - return s.tracer.StartWithParentFromContext(ctx, spanName) + return s.tracer.StartWithContext(ctx, spanName) } }() defer span.End() @@ -727,7 +727,7 @@ func (s *DefaultServer) DisconnectFromPeer(peerID peer.ID, reason string) { // Close the peer connection if closeErr := s.host.Network().ClosePeer(peerID); closeErr != nil { s.logger.Error("unable to gracefully close peer connection", "err", closeErr) - span.SetError(closeErr) + span.RecordError(closeErr) span.SetStatus(telemetry.Error, "closer peer connection failed") } } @@ -899,7 +899,7 @@ func (s *DefaultServer) AddrInfo() *peer.AddrInfo { } func (s *DefaultServer) addToDialQueue(ctx context.Context, addr *peer.AddrInfo, priority common.DialPriority) { - span := s.tracer.StartWithParentFromContext(ctx, "network.addToDialQueue") + span := s.tracer.StartWithContext(ctx, "DefaultServer.addToDialQueue") defer span.End() if s.selfID == addr.ID { @@ -911,7 +911,7 @@ func (s *DefaultServer) addToDialQueue(ctx context.Context, addr *peer.AddrInfo, } func (s *DefaultServer) emitEvent(ctx context.Context, peerID peer.ID, peerEventType peerEvent.PeerEventType) { - span := s.tracer.StartWithParentFromContext(ctx, "network.emitEvent") + span := s.tracer.StartWithContext(ctx, "DefaultServer.emitEvent") defer span.End() // POTENTIALLY BLOCKING diff --git a/network/server_discovery.go b/network/server_discovery.go index ee78fcfc35..2af798fc29 100644 --- a/network/server_discovery.go +++ b/network/server_discovery.go @@ -212,7 +212,7 @@ func (s *DefaultServer) setupDiscovery() error { // registerDiscoveryService registers the discovery protocol to be available func (s *DefaultServer) registerDiscoveryService(discovery *discovery.DiscoveryService) { - grpcStream := grpc.NewGrpcStream() + grpcStream := grpc.NewGrpcStream(context.TODO()) proto.RegisterDiscoveryServer(grpcStream.GrpcServer(), discovery) grpcStream.Serve() diff --git a/network/server_identity.go b/network/server_identity.go index 178441d6d0..e282f51a0c 100644 --- a/network/server_identity.go +++ b/network/server_identity.go @@ -136,7 +136,7 @@ func (s *DefaultServer) setupIdentity() error { // registerIdentityService registers the identity service func (s *DefaultServer) registerIdentityService(identityService *identity.IdentityService) { - grpcStream := grpc.NewGrpcStream() + grpcStream := grpc.NewGrpcStream(context.TODO()) proto.RegisterIdentityServer(grpcStream.GrpcServer(), identityService) grpcStream.Serve() diff --git a/protocol/client_test.go b/protocol/client_test.go index e56b2a08a8..720812886a 100644 --- a/protocol/client_test.go +++ b/protocol/client_test.go @@ -52,7 +52,7 @@ func newTestSyncPeerClient(network network.Network, blockchain Blockchain) *sync } // need to register protocol - network.RegisterProtocol(_syncerV1, grpc.NewGrpcStream()) + network.RegisterProtocol(_syncerV1, grpc.NewGrpcStream(context.TODO())) return client } @@ -561,7 +561,7 @@ func Test_syncPeerClient_GetBlocks(t *testing.T) { // setupIncompatibleGRPCServer setups an incompatible protocol GRPC server func (s *syncPeerService) setupIncompatibleGRPCServer() { - s.stream = grpc.NewGrpcStream() + s.stream = grpc.NewGrpcStream(context.TODO()) proto.RegisterV1Server(s.stream.GrpcServer(), s) s.stream.Serve() diff --git a/protocol/service.go b/protocol/service.go index c1dd4133f1..273b59cc65 100644 --- a/protocol/service.go +++ b/protocol/service.go @@ -57,7 +57,7 @@ func (s *syncPeerService) SetSyncer(syncer *noForkSyncer) { // setupGRPCServer setup GRPC server func (s *syncPeerService) setupGRPCServer() { - s.stream = grpc.NewGrpcStream() + s.stream = grpc.NewGrpcStream(context.TODO()) proto.RegisterV1Server(s.stream.GrpcServer(), s) s.stream.Serve() From 0728a09dd7075902ad8b26b6722803434e1d8c3f Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Wed, 8 Mar 2023 18:27:55 +0800 Subject: [PATCH 8/9] nil tracer passthrough context --- helper/telemetry/nil_provider.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/helper/telemetry/nil_provider.go b/helper/telemetry/nil_provider.go index ac9ee6b222..c920421df6 100644 --- a/helper/telemetry/nil_provider.go +++ b/helper/telemetry/nil_provider.go @@ -8,6 +8,7 @@ import ( // nilSpan type nilSpan struct { + ctx context.Context } // SetAttribute sets an attribute @@ -36,17 +37,20 @@ func (s *nilSpan) SpanContext() trace.SpanContext { } func (s *nilSpan) Context() context.Context { - return context.Background() + return s.ctx } // nilTracer type nilTracer struct { provider *nilTracerProvider + ctx context.Context } // Start starts a new span func (t *nilTracer) Start(name string) Span { - return &nilSpan{} + return &nilSpan{ + ctx: t.ctx, + } } // StartWithParent starts a new span with a parent @@ -64,12 +68,14 @@ func (t *nilTracer) GetTraceProvider() TracerProvider { // nilTracerProvider type nilTracerProvider struct { + ctx context.Context } // NewTracer creates a new tracer func (p *nilTracerProvider) NewTracer(namespace string) Tracer { return &nilTracer{ provider: p, + ctx: p.ctx, } } @@ -80,5 +86,7 @@ func (p *nilTracerProvider) Shutdown(ctx context.Context) error { // NewNilTracerProvider creates a new trace provider func NewNilTracerProvider(ctx context.Context) TracerProvider { - return &nilTracerProvider{} + return &nilTracerProvider{ + ctx, + } } From c783e29faa17eff153ac70d74ef9a87830725ff9 Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Tue, 21 Mar 2023 11:35:16 +0800 Subject: [PATCH 9/9] fix flag check --- command/helper/helper.go | 13 ++++++++++--- command/server/server.go | 4 ++-- network/peer_connect.go | 2 ++ 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/command/helper/helper.go b/command/helper/helper.go index 6d71e95127..53ab3703c2 100644 --- a/command/helper/helper.go +++ b/command/helper/helper.go @@ -276,10 +276,17 @@ func RegisterJaegerFlag(cmd *cobra.Command) { // GetJaegerFlag extracts the telemetry flag func GetJaegerFlag(cmd *cobra.Command) (bool, string) { - v, _ := cmd.Flags().GetBool(command.JaegerFlag) - addr, _ := cmd.Flags().GetString(command.JaegerAddressFlag) + enableJaeger, err := cmd.Flags().GetBool(command.JaegerFlag) + if err != nil && enableJaeger { + addr, err := cmd.Flags().GetString(command.JaegerAddressFlag) + if err != nil { + return false, "" + } + + return true, addr + } - return v, addr + return false, "" } // ParseGraphQLAddress parses the passed in GraphQL address diff --git a/command/server/server.go b/command/server/server.go index e4a53c73ed..8ea9f0fc5e 100644 --- a/command/server/server.go +++ b/command/server/server.go @@ -484,8 +484,8 @@ func runCommand(cmd *cobra.Command, _ []string) { params.rawConfig.EnablePprof = helper.GetPprofFlag(cmd) // jaeger flag - if enable, jaegerURL := helper.GetJaegerFlag(cmd); enable { - params.rawConfig.Telemetry.EnableJaeger = enable + if enableJaeger, jaegerURL := helper.GetJaegerFlag(cmd); enableJaeger { + params.rawConfig.Telemetry.EnableJaeger = enableJaeger params.rawConfig.Telemetry.JaegerURL = jaegerURL } diff --git a/network/peer_connect.go b/network/peer_connect.go index fc8f9e67b6..8a1bd53a64 100644 --- a/network/peer_connect.go +++ b/network/peer_connect.go @@ -58,6 +58,8 @@ func (pci *PeerConnInfo) cleanProtocolStreams(ctx context.Context, trace telemet errs := []error{} span := trace.StartWithContext(ctx, "PeerConnInfo.cleanProtocolStreams") + defer span.End() + for protocolName, clt := range pci.protocolClient { if clt != nil { err := clt.Close()