Skip to content

Commit

Permalink
Report GraphQL stats from alpha. (#4607)
Browse files Browse the repository at this point in the history
* Report GraphQL stats from alpha.

* Remove old telemetry file.

* Addressed comments.

* Stop waiting for telemetry go routine when stopping dgraph.
  • Loading branch information
Arijit Das authored Jan 21, 2020
1 parent 3b96ce7 commit 4d1d0e0
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 13 deletions.
6 changes: 6 additions & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ they form a Raft group and provide synchronous replication.
"Actual usage by the process would be more than specified here.")
flag.String("mutations", "allow",
"Set mutation mode to allow, disallow, or strict.")
flag.Bool("telemetry", true, "Send anonymous telemetry data to Dgraph devs.")

// Useful for running multiple servers on the same machine.
flag.IntP("port_offset", "o", 0,
Expand Down Expand Up @@ -487,9 +488,14 @@ func setupServer(closer *y.Closer) {
go serveGRPC(grpcListener, tlsCfg, &wg)
go serveHTTP(httpListener, tlsCfg, &wg)

if Alpha.Conf.GetBool("telemetry") {
go edgraph.PeriodicallyPostTelemetry()
}

go func() {
defer wg.Done()
<-shutdownCh

// Stops grpc/http servers; Already accepted connections are not closed.
if err := grpcListener.Close(); err != nil {
glog.Warningf("Error while closing gRPC listener: %s", err)
Expand Down
10 changes: 6 additions & 4 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/telemetry"
"github.com/dgraph-io/dgraph/x"
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
Expand Down Expand Up @@ -94,7 +95,7 @@ func (s *Server) Init() {
}

func (s *Server) periodicallyPostTelemetry() {
glog.V(2).Infof("Starting telemetry data collection...")
glog.V(2).Infof("Starting telemetry data collection for zero...")
start := time.Now()

ticker := time.NewTicker(time.Minute)
Expand All @@ -109,17 +110,18 @@ func (s *Server) periodicallyPostTelemetry() {
continue
}
ms := s.membershipState()
t := newTelemetry(ms)
t := telemetry.NewZero(ms)
if t == nil {
continue
}
t.SinceHours = int(time.Since(start).Hours())
glog.V(2).Infof("Posting Telemetry data: %+v", t)

err := t.post()
glog.V(2).Infof("Telemetry data posted with error: %v", err)
err := t.Post()
if err == nil {
lastPostedAt = time.Now()
} else {
glog.V(2).Infof("Telemetry couldn't be posted. Error: %v", err)
}
}
}
Expand Down
40 changes: 39 additions & 1 deletion edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"
"unicode"

Expand All @@ -37,6 +38,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/telemetry"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/worker"
Expand Down Expand Up @@ -76,9 +78,44 @@ const (
isGraphQL key = iota
)

var (
numQueries uint64
numGraphQL uint64
)

// Server implements protos.DgraphServer
type Server struct{}

// PeriodicallyPostTelemetry periodically reports telemetry data for alpha.
func PeriodicallyPostTelemetry() {
glog.V(2).Infof("Starting telemetry data collection for alpha...")

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

var lastPostedAt time.Time
for range ticker.C {
if time.Since(lastPostedAt) < time.Hour {
continue
}
ms := worker.GetMembershipState()
t := telemetry.NewAlpha(ms)
t.NumQueries = atomic.SwapUint64(&numQueries, 0)
t.NumGraphQL = atomic.SwapUint64(&numGraphQL, 0)
t.SinceHours = int(time.Since(lastPostedAt).Hours())
glog.V(2).Infof("Posting Telemetry data: %+v", t)

err := t.Post()
if err == nil {
lastPostedAt = time.Now()
} else {
atomic.AddUint64(&numQueries, t.NumQueries)
atomic.AddUint64(&numGraphQL, t.NumGraphQL)
glog.V(2).Infof("Telemetry couldn't be posted. Error: %v", err)
}
}
}

// Alter handles requests to change the schema or remove parts or all of the data.
func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, error) {
ctx, span := otrace.StartSpan(ctx, "Server.Alter")
Expand Down Expand Up @@ -666,17 +703,18 @@ func (s *Server) State(ctx context.Context) (*api.Response, error) {

// Query handles queries or mutations
func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) {
atomic.AddUint64(&numGraphQL, 1)
return s.doQuery(ctx, req, NeedAuthorize)
}

// QueryForGraphql handles queries or mutations
func (s *Server) QueryForGraphql(ctx context.Context, req *api.Request) (*api.Response, error) {
atomic.AddUint64(&numQueries, 1)
return s.doQuery(context.WithValue(ctx, isGraphQL, true), req, NeedAuthorize)
}

func (s *Server) doQuery(ctx context.Context, req *api.Request, authorize int) (
resp *api.Response, rerr error) {

if ctx.Err() != nil {
return nil, ctx.Err()
}
Expand Down
33 changes: 25 additions & 8 deletions dgraph/cmd/zero/telemetry.go → telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@
* limitations under the License.
*/

package zero
package telemetry

import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"runtime"
"time"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

// Telemetry holds information about the state of the zero server.
// Telemetry holds information about the state of the zero and alpha server.
type Telemetry struct {
Arch string
Cid string
Expand All @@ -41,11 +42,14 @@ type Telemetry struct {
OS string
SinceHours int
Version string
NumQueries uint64
NumGraphQL uint64
}

var keenURL = "https://ping.dgraph.io/3.0/projects/5b809dfac9e77c0001783ad0/events"
var url = "https://ping.dgraph.io/3.0/projects/5b809dfac9e77c0001783ad0/events"

func newTelemetry(ms *pb.MembershipState) *Telemetry {
// NewZero returns a Telemetry struct that holds information about the state of zero server.
func NewZero(ms *pb.MembershipState) *Telemetry {
if len(ms.Cid) == 0 {
glog.V(2).Infoln("No CID found yet")
return nil
Expand All @@ -70,14 +74,27 @@ func newTelemetry(ms *pb.MembershipState) *Telemetry {
return t
}

func (t *Telemetry) post() error {
// NewAlpha returns a Telemetry struct that holds information about the state of alpha server.
func NewAlpha(ms *pb.MembershipState) *Telemetry {
return &Telemetry{
Cid: ms.Cid,
Version: x.Version(),
OS: runtime.GOOS,
Arch: runtime.GOARCH,
}
}

// Post reports the Telemetry to the stats server.
func (t *Telemetry) Post() error {
data, err := json.Marshal(t)
if err != nil {
return err
}
url := keenURL + "/dev"

if len(t.Version) > 0 {
url = keenURL + "/pings"
url += "/pings"
} else {
url += "/dev"
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
if err != nil {
Expand All @@ -88,7 +105,7 @@ func (t *Telemetry) post() error {
"97497CA758881BD7D56CC2355A2F36B4560102CBC3279AC7B27E5391372C36A31167EB0D06BF3764894AD20"+
"A0554BAFF14C292A40BC252BB9FF008736A0FD1D44E085")

client := &http.Client{}
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
Expand Down

0 comments on commit 4d1d0e0

Please sign in to comment.