Skip to content

Commit

Permalink
Stop waiting for telemetry go routine when stopping dgraph.
Browse files Browse the repository at this point in the history
  • Loading branch information
Arijit Das committed Jan 21, 2020
1 parent ab399b5 commit 8d42b3b
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 78 deletions.
5 changes: 1 addition & 4 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,16 +488,13 @@ func setupServer(closer *y.Closer) {
go serveGRPC(grpcListener, tlsCfg, &wg)
go serveHTTP(httpListener, tlsCfg, &wg)

doneTelemetry := make(chan interface{})
if Alpha.Conf.GetBool("telemetry") {
wg.Add(1)
go edgraph.PeriodicallyPostTelemetry(doneTelemetry, &wg)
go edgraph.PeriodicallyPostTelemetry()
}

go func() {
defer wg.Done()
<-shutdownCh
close(doneTelemetry)

// Stops grpc/http servers; Already accepted connections are not closed.
if err := grpcListener.Close(); err != nil {
Expand Down
45 changes: 20 additions & 25 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,31 +102,26 @@ func (s *Server) periodicallyPostTelemetry() {
defer ticker.Stop()

var lastPostedAt time.Time
for {
select {
case <-ticker.C:
if !s.Node.AmLeader() {
continue
}
if time.Since(lastPostedAt) < time.Hour {
continue
}
ms := s.membershipState()
t := telemetry.NewZero(ms)
if t == nil {
continue
}
t.SinceHours = int(time.Since(start).Hours())
glog.V(2).Infof("Posting Telemetry data: %+v", t)

err := t.Post()
if err == nil {
lastPostedAt = time.Now()
} else {
glog.V(1).Infof("Telemetry couldn't be posted. Error: %v", err)
}
case <-s.closer.HasBeenClosed():
return
for range ticker.C {
if !s.Node.AmLeader() {
continue
}
if time.Since(lastPostedAt) < time.Hour {
continue
}
ms := s.membershipState()
t := telemetry.NewZero(ms)
if t == nil {
continue
}
t.SinceHours = int(time.Since(start).Hours())
glog.V(2).Infof("Posting Telemetry data: %+v", t)

err := t.Post()
if err == nil {
lastPostedAt = time.Now()
} else {
glog.V(2).Infof("Telemetry couldn't be posted. Error: %v", err)
}
}
}
Expand Down
54 changes: 23 additions & 31 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unicode"
Expand Down Expand Up @@ -80,46 +79,39 @@ const (
)

var (
graphqlQueryCount uint64
graphqlpmQueryCount uint64
numQueries uint64
numGraphQL uint64
)

// Server implements protos.DgraphServer
type Server struct{}

// PeriodicallyPostTelemetry periodically reports telemetry data for alpha.
func PeriodicallyPostTelemetry(doneTelemetry <-chan interface{}, wg *sync.WaitGroup) {
func PeriodicallyPostTelemetry() {
glog.V(2).Infof("Starting telemetry data collection for alpha...")

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

var lastPostedAt time.Time
for {
select {
case <-ticker.C:
if time.Since(lastPostedAt) < time.Hour {
continue
}
ms := worker.GetMembershipState()
t := telemetry.NewAlpha(ms)
t.GraphqlQueryCount = graphqlQueryCount
t.GraphqlpmQueryCount = graphqlpmQueryCount
t.SinceHours = int(time.Since(lastPostedAt).Hours())
glog.V(2).Infof("Posting Telemetry data: %+v", t)

err := t.Post()
if err == nil {
atomic.StoreUint64(&graphqlQueryCount, 0)
atomic.StoreUint64(&graphqlpmQueryCount, 0)
lastPostedAt = time.Now()
} else {
glog.V(1).Infof("Telemetry couldn't be posted. Error: %v", err)
}
case <-doneTelemetry:
glog.V(2).Infof("Stopping reporting of Telemetry data.")
wg.Done()
return
for range ticker.C {
if time.Since(lastPostedAt) < time.Hour {
continue
}
ms := worker.GetMembershipState()
t := telemetry.NewAlpha(ms)
t.NumQueries = atomic.SwapUint64(&numQueries, 0)
t.NumGraphQL = atomic.SwapUint64(&numGraphQL, 0)
t.SinceHours = int(time.Since(lastPostedAt).Hours())
glog.V(2).Infof("Posting Telemetry data: %+v", t)

err := t.Post()
if err == nil {
lastPostedAt = time.Now()
} else {
atomic.AddUint64(&numQueries, t.NumQueries)
atomic.AddUint64(&numGraphQL, t.NumGraphQL)
glog.V(2).Infof("Telemetry couldn't be posted. Error: %v", err)
}
}
}
Expand Down Expand Up @@ -711,13 +703,13 @@ func (s *Server) State(ctx context.Context) (*api.Response, error) {

// Query handles queries or mutations
func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) {
atomic.AddUint64(&graphqlpmQueryCount, 1)
atomic.AddUint64(&numGraphQL, 1)
return s.doQuery(ctx, req, NeedAuthorize)
}

// QueryForGraphql handles queries or mutations
func (s *Server) QueryForGraphql(ctx context.Context, req *api.Request) (*api.Response, error) {
atomic.AddUint64(&graphqlQueryCount, 1)
atomic.AddUint64(&numQueries, 1)
return s.doQuery(context.WithValue(ctx, isGraphQL, true), req, NeedAuthorize)
}

Expand Down
38 changes: 20 additions & 18 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
* Copyright 2018 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,22 +31,22 @@ import (

// Telemetry holds information about the state of the zero and alpha server.
type Telemetry struct {
Arch string
Cid string
ClusterSize int
DiskUsageMB int64
NumAlphas int
NumGroups int
NumTablets int
NumZeros int
OS string
SinceHours int
Version string
GraphqlQueryCount uint64
GraphqlpmQueryCount uint64
Arch string
Cid string
ClusterSize int
DiskUsageMB int64
NumAlphas int
NumGroups int
NumTablets int
NumZeros int
OS string
SinceHours int
Version string
NumQueries uint64
NumGraphQL uint64
}

var keenURL = "https://ping.dgraph.io/3.0/projects/5b809dfac9e77c0001783ad0/events"
var url = "https://ping.dgraph.io/3.0/projects/5b809dfac9e77c0001783ad0/events"

// NewZero returns a Telemetry struct that holds information about the state of zero server.
func NewZero(ms *pb.MembershipState) *Telemetry {
Expand Down Expand Up @@ -90,9 +90,11 @@ func (t *Telemetry) Post() error {
if err != nil {
return err
}
url := keenURL + "/dev"

if len(t.Version) > 0 {
url = keenURL + "/pings"
url += "/pings"
} else {
url += "/dev"
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
if err != nil {
Expand All @@ -103,7 +105,7 @@ func (t *Telemetry) Post() error {
"97497CA758881BD7D56CC2355A2F36B4560102CBC3279AC7B27E5391372C36A31167EB0D06BF3764894AD20"+
"A0554BAFF14C292A40BC252BB9FF008736A0FD1D44E085")

client := &http.Client{Timeout: 10 * time.Second}
client := &http.Client{Timeout: time.Second * 5}
resp, err := client.Do(req)
if err != nil {
return err
Expand Down

0 comments on commit 8d42b3b

Please sign in to comment.