Skip to content

Commit

Permalink
Integrate Open Census in Dgraph (dgraph-io#2739)
Browse files Browse the repository at this point in the history
As the first priority, trace the mutation and proposal path. Already found an issue with latency (caused by unnecessary proposal retries), so this integration is already reaping results -- will fix that in the next PR.

Integrated with Jaeger. Can add more exporters over time.
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent 35d25d3 commit 3f072c9
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 0 deletions.
37 changes: 37 additions & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ import (
"github.com/golang/glog"
"github.com/spf13/cast"
"github.com/spf13/cobra"
"go.opencensus.io/exporter/jaeger"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats/view"
otrace "go.opencensus.io/trace"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc"
Expand Down Expand Up @@ -88,6 +92,9 @@ they form a Raft group and provide synchronous replication.
"[mmap, disk] Specifies how Badger Value log is stored."+
" mmap consumes more RAM, but provides better performance.")

flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.")
flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.")

flag.StringP("wal", "w", "w", "Directory to store raft write-ahead logs.")
flag.Bool("nomutations", false, "Don't allow mutations on this server.")
flag.String("whitelist", "",
Expand Down Expand Up @@ -235,14 +242,44 @@ func setupListener(addr string, port int, reload func()) (net.Listener, error) {

func serveGRPC(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) {
defer wg.Done()
if err := view.Register(ocgrpc.DefaultServerViews...); err != nil {
glog.Fatalf("Unable to register opencensus: %v", err)
}

handler := &ocgrpc.ServerHandler{
IsPublicEndpoint: true,
StartOptions: otrace.StartOptions{
Sampler: otrace.AlwaysSample(),
},
}
opt := []grpc.ServerOption{
grpc.MaxRecvMsgSize(x.GrpcMaxSize),
grpc.MaxSendMsgSize(x.GrpcMaxSize),
grpc.MaxConcurrentStreams(1000),
grpc.StatsHandler(handler),
}
if tlsCfg != nil {
opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg)))
}

if agent := Alpha.Conf.GetString("jaeger.agent"); len(agent) > 0 {
// Port details: https://www.jaegertracing.io/docs/getting-started/
// Default endpoints are:
// agentEndpointURI := "localhost:6831"
// collectorEndpointURI := "http://localhost:14268"
collector := Alpha.Conf.GetString("jaeger.collector")
je, err := jaeger.NewExporter(jaeger.Options{
AgentEndpoint: agent,
Endpoint: collector,
ServiceName: "dgraph.alpha",
})
if err != nil {
log.Fatalf("Failed to create the Jaeger exporter: %v", err)
}
// And now finally register it as a Trace Exporter
otrace.RegisterExporter(je)
}

s := grpc.NewServer(opt...)
api.RegisterDgraphServer(s, &edgraph.Server{})
hapi.RegisterHealthServer(s, health.NewServer())
Expand Down
4 changes: 4 additions & 0 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/dgraph-io/dgraph/x"

"github.com/golang/glog"
otrace "go.opencensus.io/trace"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -326,6 +327,9 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
}

func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assigned, err error) {
ctx, span := otrace.StartSpan(ctx, "Server.Mutate")
defer span.End()

resp = &api.Assigned{}
if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
Expand Down
7 changes: 7 additions & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/y"
Expand Down Expand Up @@ -179,6 +180,8 @@ func (n *node) applyMutations(proposal *pb.Proposal, index uint64) error {

startTs := proposal.Mutations.StartTs
ctx := n.Ctx(proposal.Key)
ctx, span := otrace.StartSpan(ctx, "node.applyMutations")
defer span.End()

if len(proposal.Mutations.Schema) > 0 {
tr.LazyPrintf("Applying Schema")
Expand Down Expand Up @@ -260,6 +263,7 @@ func (n *node) applyMutations(proposal *pb.Proposal, index uint64) error {
return dy.ErrConflict
}
tr.LazyPrintf("Applying %d edges", len(m.Edges))
span.Annotate([]otrace.Attribute{otrace.Int64Attribute("num", int64(len(m.Edges)))}, "Applying edges")
for _, edge := range m.Edges {
err := posting.ErrRetry
for err == posting.ErrRetry {
Expand All @@ -282,6 +286,9 @@ func (n *node) applyCommitted(proposal *pb.Proposal, index uint64) error {
}

ctx := n.Ctx(proposal.Key)
ctx, span := otrace.StartSpan(ctx, "node.applyCommitted")
defer span.End()

switch {
case len(proposal.Kv) > 0:
return populateKeyValues(ctx, proposal.Kv)
Expand Down
4 changes: 4 additions & 0 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/dgraph-io/dgraph/x"

"github.com/golang/glog"
otrace "go.opencensus.io/trace"
"golang.org/x/net/context"
"golang.org/x/net/trace"
)
Expand Down Expand Up @@ -514,6 +515,9 @@ type res struct {
// MutateOverNetwork checks which group should be running the mutations
// according to the group config and sends it to that instance.
func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error) {
ctx, span := otrace.StartSpan(ctx, "worker.MutateOverNetwork")
defer span.End()

tctx := &api.TxnContext{StartTs: m.StartTs}
mutationMap := populateMutationMap(m)

Expand Down
4 changes: 4 additions & 0 deletions worker/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/x"
otrace "go.opencensus.io/trace"
"golang.org/x/net/context"
"golang.org/x/net/trace"
)
Expand Down Expand Up @@ -150,6 +151,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) error
cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

cctx, span := otrace.StartSpan(cctx, "node.propose")
defer span.End()

che := make(chan error, 1)
pctx := &conn.ProposalCtx{
Ch: che,
Expand Down

0 comments on commit 3f072c9

Please sign in to comment.