Skip to content

Commit

Permalink
Implement heath?all endpoint on Alpha nodes. (#4535)
Browse files Browse the repository at this point in the history
  • Loading branch information
parasssh authored Jan 11, 2020
1 parent f98fe35 commit 48c79ff
Show file tree
Hide file tree
Showing 11 changed files with 905 additions and 333 deletions.
8 changes: 5 additions & 3 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Node struct {
_raft raft.Node

// Fields which are never changed after init.
StartTime time.Time
Cfg *raft.Config
MyAddr string
Id uint64
Expand All @@ -84,9 +85,10 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node {
x.Check(err)

n := &Node{
Id: rc.Id,
MyAddr: rc.Addr,
Store: store,
StartTime: time.Now(),
Id: rc.Id,
MyAddr: rc.Addr,
Store: store,
Cfg: &raft.Config{
ID: rc.Id,
ElectionTick: 20, // 2s if we call Tick() every 100 ms.
Expand Down
38 changes: 32 additions & 6 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ type Pool struct {
// messages in the same TCP stream.
conn *grpc.ClientConn

lastEcho time.Time
Addr string
closer *y.Closer
lastEcho time.Time
Addr string
closer *y.Closer
healthInfo pb.HealthInfo
}

// Pools manages a concurrency-safe set of Pool.
Expand Down Expand Up @@ -85,6 +86,17 @@ func (p *Pools) Get(addr string) (*Pool, error) {
return pool, nil
}

// GetAll returns all pool entries.
func (p *Pools) GetAll() []*Pool {
p.RLock()
defer p.RUnlock()
var pool []*Pool
for _, v := range p.all {
pool = append(pool, v)
}
return pool
}

// RemoveInvalid removes invalid nodes from the list of pools.
func (p *Pools) RemoveInvalid(state *pb.MembershipState) {
// Keeps track of valid IP addresses, assigned to active nodes. We do this
Expand Down Expand Up @@ -212,13 +224,15 @@ func (p *Pool) listenToHeartbeat() error {

// This loop can block indefinitely as long as it keeps on receiving pings back.
for {
_, err := s.Recv()
if err != nil {
res, err := s.Recv()
if err != nil || res == nil {
return err
}

// We do this periodic stream receive based approach to defend against network partitions.
p.Lock()
p.lastEcho = time.Now()
p.healthInfo = *res
p.Unlock()
}
}
Expand All @@ -235,7 +249,7 @@ func (p *Pool) MonitorHealth() {
default:
err := p.listenToHeartbeat()
if lastErr != nil && err == nil {
glog.Infof("Connection established with %v\n", p.Addr)
glog.Infof("Connection re-established with %v\n", p.Addr)
} else if err != nil && lastErr == nil {
glog.Warningf("Connection lost with %v. Error: %v\n", p.Addr, err)
}
Expand All @@ -255,3 +269,15 @@ func (p *Pool) IsHealthy() bool {
defer p.RUnlock()
return time.Since(p.lastEcho) < 4*echoDuration
}

// HealthInfo returns the healthinfo.
func (p *Pool) HealthInfo() pb.HealthInfo {
p.RLock()
defer p.RUnlock()
p.healthInfo.Status = "healthy"
if !p.IsHealthy() {
p.healthInfo.Status = "unhealthy"
}
p.healthInfo.LastEcho = p.lastEcho.Unix()
return p.healthInfo
}
23 changes: 20 additions & 3 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/binary"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -271,18 +272,34 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {

// Heartbeat rpc call is used to check connection with other workers after worker
// tcp server for this instance starts.
func (w *RaftServer) Heartbeat(in *api.Payload, stream pb.Raft_HeartbeatServer) error {
func (w *RaftServer) Heartbeat(_ *api.Payload, stream pb.Raft_HeartbeatServer) error {
ticker := time.NewTicker(echoDuration)
defer ticker.Stop()

node := w.GetNode()
if node == nil {
return ErrNoNode
}
info := pb.HealthInfo{
Instance: "alpha",
Addr: node.MyAddr,
Group: strconv.Itoa(int(node.RaftContext.GetGroup())),
Version: x.Version(),
Uptime: int64(time.Since(node.StartTime) / time.Second),
}
if info.Group == "0" {
info.Instance = "zero"
}

ctx := stream.Context()
out := &api.Payload{Data: []byte("beat")}

for {
info.Uptime = int64(time.Since(node.StartTime) / time.Second)
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := stream.Send(out); err != nil {
if err := stream.Send(&info); err != nil {
return err
}
}
Expand Down
24 changes: 22 additions & 2 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
bindall bool

// used for computing uptime
beginTime = time.Now()
startTime = time.Now()

// Alpha is the sub-command invoked when running "dgraph alpha".
Alpha x.SubCommand
Expand Down Expand Up @@ -284,6 +284,25 @@ func grpcPort() int {
func healthCheck(w http.ResponseWriter, r *http.Request) {
x.AddCorsHeaders(w)

if _, ok := r.URL.Query()["all"]; ok {
var err error
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

ctx := attachAccessJwt(context.Background(), r)
var resp *api.Response
if resp, err = (&edgraph.Server{}).HealthAll(ctx); err != nil {
x.SetStatus(w, x.Error, err.Error())
return
}
if resp == nil {
x.SetStatus(w, x.ErrorNoData, "No state information available.")
return
}
_, _ = w.Write(resp.Json)
return
}

_, ok := r.URL.Query()["live"]
if !ok {
if err := x.HealthCheck(); err != nil {
Expand All @@ -303,7 +322,7 @@ func healthCheck(w http.ResponseWriter, r *http.Request) {
}{
Version: x.Version(),
Instance: "alpha",
Uptime: time.Since(beginTime),
Uptime: time.Since(startTime) / time.Second,
}
data, _ := json.Marshal(info)

Expand Down Expand Up @@ -559,6 +578,7 @@ func run() {
AclEnabled: secretFile != "",
SnapshotAfter: Alpha.Conf.GetInt("snapshot_after"),
AbortOlderThan: abortDur,
StartTime: startTime,
}

setupCustomTokenizers()
Expand Down
2 changes: 1 addition & 1 deletion edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error {
return nil
}

func authorizeState(ctx context.Context) error {
func authorizeGroot(ctx context.Context) error {
// always allow access
return nil
}
8 changes: 4 additions & 4 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,16 +784,16 @@ func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error {
return nil
}

// authorizeState authorizes the State operation
func authorizeState(ctx context.Context) error {
// authorizeGroot authorizes the operation for Groot users.
func authorizeGroot(ctx context.Context) error {
if len(worker.Config.HmacSecret) == 0 {
// the user has not turned on the acl feature
return nil
}

var userID string
// doAuthorizeState checks if the user is authorized to perform this API request
doAuthorizeState := func() error {
doAuthorizeGroot := func() error {
userData, err := extractUserAndGroups(ctx)
switch {
case err == errNoJwt:
Expand All @@ -811,7 +811,7 @@ func authorizeState(ctx context.Context) error {
}
}

return doAuthorizeState()
return doAuthorizeGroot()
}

func removePredsFromQuery(gqs []*gql.GraphQuery,
Expand Down
46 changes: 36 additions & 10 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/dgraph-io/dgo/v2/protos/api"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting"
Expand Down Expand Up @@ -606,22 +607,47 @@ type queryContext struct {
graphql bool
}

// State handles state requests
func (s *Server) State(ctx context.Context) (*api.Response, error) {
return s.doState(ctx, NeedAuthorize)
}
// HealthAll handles health?all requests.
func (s *Server) HealthAll(ctx context.Context) (*api.Response, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if err := authorizeGroot(ctx); err != nil {
return nil, err
}

func (s *Server) doState(ctx context.Context, authorize int) (
*api.Response, error) {
var healthAll []pb.HealthInfo
pool := conn.GetPools().GetAll()
for _, p := range pool {
healthAll = append(healthAll, p.HealthInfo())
}
// Append self.
healthAll = append(healthAll, pb.HealthInfo{
Instance: "alpha",
Addr: x.WorkerConfig.MyAddr,
Status: "healthy",
Group: strconv.Itoa(int(worker.GroupId())),
Version: x.Version(),
Uptime: int64(time.Since(x.WorkerConfig.StartTime) / time.Second),
LastEcho: time.Now().Unix(),
})

var err error
var jsonOut []byte
if jsonOut, err = json.Marshal(healthAll); err != nil {
return nil, errors.Errorf("Unable to Marshal. Err %v", err)
}
return &api.Response{Json: jsonOut}, nil
}

// State handles state requests
func (s *Server) State(ctx context.Context) (*api.Response, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

if authorize == NeedAuthorize {
if err := authorizeState(ctx); err != nil {
return nil, err
}
if err := authorizeGroot(ctx); err != nil {
return nil, err
}

ms := worker.GetMembershipState()
Expand Down
12 changes: 11 additions & 1 deletion protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ message ConnectionState {
uint64 max_pending = 3; // Used to determine the timstamp for reading after bulk load
}

message HealthInfo {
string Instance = 1;
string Addr = 2;
string Status = 3;
string Group = 4; // string so group = 0 can be printed in JSON.
string Version = 5;
int64 Uptime = 6;
int64 LastEcho = 7;
}

message Tablet {
uint32 group_id = 1; // Served by which group.
string predicate = 2;
Expand Down Expand Up @@ -467,7 +477,7 @@ message RaftBatch {
}

service Raft {
rpc Heartbeat (api.Payload) returns (stream api.Payload) {}
rpc Heartbeat (api.Payload) returns (stream HealthInfo) {}
rpc RaftMessage (stream RaftBatch) returns (api.Payload) {}
rpc JoinCluster (RaftContext) returns (api.Payload) {}
rpc IsPeer (RaftContext) returns (PeerResponse) {}
Expand Down
Loading

0 comments on commit 48c79ff

Please sign in to comment.