From 3f072c9b227aa17ce34ae8784e120df72736c032 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Thu, 8 Nov 2018 09:31:46 -0800 Subject: [PATCH] Integrate Open Census in Dgraph (#2739) As the first priority, trace the mutation and proposal path. Already found an issue with latency (caused by unnecessary proposal retries), so this integration is already reaping results -- will fix that in the next PR. Integrated with Jaeger. Can add more exporters over time. --- dgraph/cmd/alpha/run.go | 37 +++++++++++++++++++++++++++++++++++++ edgraph/server.go | 4 ++++ worker/draft.go | 7 +++++++ worker/mutation.go | 4 ++++ worker/proposal.go | 4 ++++ 5 files changed, 56 insertions(+) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 8c287a16664..784c30b898e 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -42,6 +42,10 @@ import ( "github.com/golang/glog" "github.com/spf13/cast" "github.com/spf13/cobra" + "go.opencensus.io/exporter/jaeger" + "go.opencensus.io/plugin/ocgrpc" + "go.opencensus.io/stats/view" + otrace "go.opencensus.io/trace" "golang.org/x/net/context" "golang.org/x/net/trace" "google.golang.org/grpc" @@ -88,6 +92,9 @@ they form a Raft group and provide synchronous replication. "[mmap, disk] Specifies how Badger Value log is stored."+ " mmap consumes more RAM, but provides better performance.") + flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.") + flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.") + flag.StringP("wal", "w", "w", "Directory to store raft write-ahead logs.") flag.Bool("nomutations", false, "Don't allow mutations on this server.") flag.String("whitelist", "", @@ -235,14 +242,44 @@ func setupListener(addr string, port int, reload func()) (net.Listener, error) { func serveGRPC(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) { defer wg.Done() + if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { + glog.Fatalf("Unable to register opencensus: %v", err) + } + + handler := &ocgrpc.ServerHandler{ + IsPublicEndpoint: true, + StartOptions: otrace.StartOptions{ + Sampler: otrace.AlwaysSample(), + }, + } opt := []grpc.ServerOption{ grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize), grpc.MaxConcurrentStreams(1000), + grpc.StatsHandler(handler), } if tlsCfg != nil { opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg))) } + + if agent := Alpha.Conf.GetString("jaeger.agent"); len(agent) > 0 { + // Port details: https://www.jaegertracing.io/docs/getting-started/ + // Default endpoints are: + // agentEndpointURI := "localhost:6831" + // collectorEndpointURI := "http://localhost:14268" + collector := Alpha.Conf.GetString("jaeger.collector") + je, err := jaeger.NewExporter(jaeger.Options{ + AgentEndpoint: agent, + Endpoint: collector, + ServiceName: "dgraph.alpha", + }) + if err != nil { + log.Fatalf("Failed to create the Jaeger exporter: %v", err) + } + // And now finally register it as a Trace Exporter + otrace.RegisterExporter(je) + } + s := grpc.NewServer(opt...) api.RegisterDgraphServer(s, &edgraph.Server{}) hapi.RegisterHealthServer(s, health.NewServer()) diff --git a/edgraph/server.go b/edgraph/server.go index abc8f1a7f63..e390902feb3 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -41,6 +41,7 @@ import ( "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" + otrace "go.opencensus.io/trace" "golang.org/x/net/context" "golang.org/x/net/trace" "google.golang.org/grpc/codes" @@ -326,6 +327,9 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er } func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assigned, err error) { + ctx, span := otrace.StartSpan(ctx, "Server.Mutate") + defer span.End() + resp = &api.Assigned{} if err := x.HealthCheck(); err != nil { if tr, ok := trace.FromContext(ctx); ok { diff --git a/worker/draft.go b/worker/draft.go index cb22dc6b50f..79c6445502c 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -28,6 +28,7 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + otrace "go.opencensus.io/trace" "github.com/dgraph-io/badger" "github.com/dgraph-io/badger/y" @@ -179,6 +180,8 @@ func (n *node) applyMutations(proposal *pb.Proposal, index uint64) error { startTs := proposal.Mutations.StartTs ctx := n.Ctx(proposal.Key) + ctx, span := otrace.StartSpan(ctx, "node.applyMutations") + defer span.End() if len(proposal.Mutations.Schema) > 0 { tr.LazyPrintf("Applying Schema") @@ -260,6 +263,7 @@ func (n *node) applyMutations(proposal *pb.Proposal, index uint64) error { return dy.ErrConflict } tr.LazyPrintf("Applying %d edges", len(m.Edges)) + span.Annotate([]otrace.Attribute{otrace.Int64Attribute("num", int64(len(m.Edges)))}, "Applying edges") for _, edge := range m.Edges { err := posting.ErrRetry for err == posting.ErrRetry { @@ -282,6 +286,9 @@ func (n *node) applyCommitted(proposal *pb.Proposal, index uint64) error { } ctx := n.Ctx(proposal.Key) + ctx, span := otrace.StartSpan(ctx, "node.applyCommitted") + defer span.End() + switch { case len(proposal.Kv) > 0: return populateKeyValues(ctx, proposal.Kv) diff --git a/worker/mutation.go b/worker/mutation.go index aaa8399ded7..78f23d89fa8 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -35,6 +35,7 @@ import ( "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" + otrace "go.opencensus.io/trace" "golang.org/x/net/context" "golang.org/x/net/trace" ) @@ -514,6 +515,9 @@ type res struct { // MutateOverNetwork checks which group should be running the mutations // according to the group config and sends it to that instance. func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error) { + ctx, span := otrace.StartSpan(ctx, "worker.MutateOverNetwork") + defer span.End() + tctx := &api.TxnContext{StartTs: m.StartTs} mutationMap := populateMutationMap(m) diff --git a/worker/proposal.go b/worker/proposal.go index bb78e23b9f1..935392268bd 100644 --- a/worker/proposal.go +++ b/worker/proposal.go @@ -25,6 +25,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" + otrace "go.opencensus.io/trace" "golang.org/x/net/context" "golang.org/x/net/trace" ) @@ -150,6 +151,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) error cctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() + cctx, span := otrace.StartSpan(cctx, "node.propose") + defer span.End() + che := make(chan error, 1) pctx := &conn.ProposalCtx{ Ch: che,