Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement heath?all endpoint on Alpha nodes. #4535

Merged
merged 15 commits into from
Jan 11, 2020
8 changes: 5 additions & 3 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Node struct {
_raft raft.Node

// Fields which are never changed after init.
StartTime time.Time
Cfg *raft.Config
MyAddr string
Id uint64
Expand All @@ -84,9 +85,10 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node {
x.Check(err)

n := &Node{
Id: rc.Id,
MyAddr: rc.Addr,
Store: store,
StartTime: time.Now(),
Id: rc.Id,
MyAddr: rc.Addr,
Store: store,
Cfg: &raft.Config{
ID: rc.Id,
ElectionTick: 20, // 2s if we call Tick() every 100 ms.
Expand Down
38 changes: 32 additions & 6 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ type Pool struct {
// messages in the same TCP stream.
conn *grpc.ClientConn

lastEcho time.Time
Addr string
closer *y.Closer
lastEcho time.Time
Addr string
closer *y.Closer
healthInfo pb.HealthInfo
}

// Pools manages a concurrency-safe set of Pool.
Expand Down Expand Up @@ -85,6 +86,17 @@ func (p *Pools) Get(addr string) (*Pool, error) {
return pool, nil
}

// GetAll returns all pool entries.
func (p *Pools) GetAll() []*Pool {
p.RLock()
defer p.RUnlock()
var pool []*Pool
for _, v := range p.all {
pool = append(pool, v)
}
return pool
}

// RemoveInvalid removes invalid nodes from the list of pools.
func (p *Pools) RemoveInvalid(state *pb.MembershipState) {
// Keeps track of valid IP addresses, assigned to active nodes. We do this
Expand Down Expand Up @@ -212,13 +224,15 @@ func (p *Pool) listenToHeartbeat() error {

// This loop can block indefinitely as long as it keeps on receiving pings back.
for {
_, err := s.Recv()
if err != nil {
res, err := s.Recv()
if err != nil || res == nil {
return err
}

// We do this periodic stream receive based approach to defend against network partitions.
p.Lock()
p.lastEcho = time.Now()
p.healthInfo = *res
p.Unlock()
}
}
Expand All @@ -235,7 +249,7 @@ func (p *Pool) MonitorHealth() {
default:
err := p.listenToHeartbeat()
if lastErr != nil && err == nil {
glog.Infof("Connection established with %v\n", p.Addr)
glog.Infof("Connection re-established with %v\n", p.Addr)
} else if err != nil && lastErr == nil {
glog.Warningf("Connection lost with %v. Error: %v\n", p.Addr, err)
}
Expand All @@ -255,3 +269,15 @@ func (p *Pool) IsHealthy() bool {
defer p.RUnlock()
return time.Since(p.lastEcho) < 4*echoDuration
}

// HealthInfo returns the healthinfo.
func (p *Pool) HealthInfo() pb.HealthInfo {
p.RLock()
defer p.RUnlock()
p.healthInfo.Status = "healthy"
if !p.IsHealthy() {
p.healthInfo.Status = "unhealthy"
}
p.healthInfo.LastEcho = p.lastEcho.Unix()
return p.healthInfo
}
23 changes: 20 additions & 3 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/binary"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -271,18 +272,34 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {

// Heartbeat rpc call is used to check connection with other workers after worker
// tcp server for this instance starts.
func (w *RaftServer) Heartbeat(in *api.Payload, stream pb.Raft_HeartbeatServer) error {
func (w *RaftServer) Heartbeat(_ *api.Payload, stream pb.Raft_HeartbeatServer) error {
ticker := time.NewTicker(echoDuration)
defer ticker.Stop()

node := w.GetNode()
if node == nil {
return ErrNoNode
}
info := pb.HealthInfo{
Instance: "alpha",
Addr: node.MyAddr,
Group: strconv.Itoa(int(node.RaftContext.GetGroup())),
Version: x.Version(),
Uptime: int64(time.Since(node.StartTime) / time.Second),
}
if info.Group == "0" {
info.Instance = "zero"
}

ctx := stream.Context()
out := &api.Payload{Data: []byte("beat")}

for {
info.Uptime = int64(time.Since(node.StartTime) / time.Second)
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := stream.Send(out); err != nil {
if err := stream.Send(&info); err != nil {
return err
}
}
Expand Down
24 changes: 22 additions & 2 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var (
bindall bool

// used for computing uptime
beginTime = time.Now()
startTime = time.Now()

// Alpha is the sub-command invoked when running "dgraph alpha".
Alpha x.SubCommand
Expand Down Expand Up @@ -278,6 +278,25 @@ func grpcPort() int {
func healthCheck(w http.ResponseWriter, r *http.Request) {
x.AddCorsHeaders(w)

if _, ok := r.URL.Query()["all"]; ok {
var err error
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

ctx := attachAccessJwt(context.Background(), r)
var resp *api.Response
if resp, err = (&edgraph.Server{}).HealthAll(ctx); err != nil {
x.SetStatus(w, x.Error, err.Error())
return
}
if resp == nil {
x.SetStatus(w, x.ErrorNoData, "No state information available.")
return
}
_, _ = w.Write(resp.Json)
return
}

_, ok := r.URL.Query()["live"]
if !ok {
if err := x.HealthCheck(); err != nil {
Expand All @@ -297,7 +316,7 @@ func healthCheck(w http.ResponseWriter, r *http.Request) {
}{
Version: x.Version(),
Instance: "alpha",
Uptime: time.Since(beginTime),
Uptime: time.Since(startTime) / time.Second,
}
data, _ := json.Marshal(info)

Expand Down Expand Up @@ -532,6 +551,7 @@ func run() {
AclEnabled: secretFile != "",
SnapshotAfter: Alpha.Conf.GetInt("snapshot_after"),
AbortOlderThan: abortDur,
StartTime: startTime,
}

setupCustomTokenizers()
Expand Down
2 changes: 1 addition & 1 deletion edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error {
return nil
}

func authorizeState(ctx context.Context) error {
func authorizeGroot(ctx context.Context) error {
// always allow access
return nil
}
8 changes: 4 additions & 4 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,16 +784,16 @@ func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error {
return nil
}

// authorizeState authorizes the State operation
func authorizeState(ctx context.Context) error {
// authorizeGroot authorizes the operation for Groot users.
func authorizeGroot(ctx context.Context) error {
if len(worker.Config.HmacSecret) == 0 {
// the user has not turned on the acl feature
return nil
}

var userID string
// doAuthorizeState checks if the user is authorized to perform this API request
doAuthorizeState := func() error {
doAuthorizeGroot := func() error {
userData, err := extractUserAndGroups(ctx)
switch {
case err == errNoJwt:
Expand All @@ -811,7 +811,7 @@ func authorizeState(ctx context.Context) error {
}
}

return doAuthorizeState()
return doAuthorizeGroot()
}

func removePredsFromQuery(gqs []*gql.GraphQuery,
Expand Down
46 changes: 36 additions & 10 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/dgraph-io/dgo/v2/protos/api"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting"
Expand Down Expand Up @@ -598,22 +599,47 @@ type queryContext struct {
span *trace.Span
}

// State handles state requests
func (s *Server) State(ctx context.Context) (*api.Response, error) {
return s.doState(ctx, NeedAuthorize)
}
// HealthAll handles health?all requests.
func (s *Server) HealthAll(ctx context.Context) (*api.Response, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if err := authorizeGroot(ctx); err != nil {
return nil, err
}

func (s *Server) doState(ctx context.Context, authorize int) (
*api.Response, error) {
var healthAll []pb.HealthInfo
pool := conn.GetPools().GetAll()
for _, p := range pool {
healthAll = append(healthAll, p.HealthInfo())
}
// Append self.
healthAll = append(healthAll, pb.HealthInfo{
Instance: "alpha",
Addr: x.WorkerConfig.MyAddr,
Status: "healthy",
Group: strconv.Itoa(int(worker.GroupId())),
Version: x.Version(),
Uptime: int64(time.Since(x.WorkerConfig.StartTime) / time.Second),
LastEcho: time.Now().Unix(),
})

var err error
var jsonOut []byte
if jsonOut, err = json.Marshal(healthAll); err != nil {
return nil, errors.Errorf("Unable to Marshal. Err %v", err)
}
return &api.Response{Json: jsonOut}, nil
}

// State handles state requests
func (s *Server) State(ctx context.Context) (*api.Response, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

if authorize == NeedAuthorize {
if err := authorizeState(ctx); err != nil {
return nil, err
}
if err := authorizeGroot(ctx); err != nil {
return nil, err
}

ms := worker.GetMembershipState()
Expand Down
12 changes: 11 additions & 1 deletion protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ message ConnectionState {
uint64 max_pending = 3; // Used to determine the timstamp for reading after bulk load
}

message HealthInfo {
string Instance = 1;
string Addr = 2;
string Status = 3;
string Group = 4; // string so group = 0 can be printed in JSON.
string Version = 5;
int64 Uptime = 6;
int64 LastEcho = 7;
}

message Tablet {
uint32 group_id = 1; // Served by which group.
string predicate = 2;
Expand Down Expand Up @@ -467,7 +477,7 @@ message RaftBatch {
}

service Raft {
rpc Heartbeat (api.Payload) returns (stream api.Payload) {}
rpc Heartbeat (api.Payload) returns (stream HealthInfo) {}
rpc RaftMessage (stream RaftBatch) returns (api.Payload) {}
rpc JoinCluster (RaftContext) returns (api.Payload) {}
rpc IsPeer (RaftContext) returns (PeerResponse) {}
Expand Down
Loading