Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tablet error handling. #3323

Merged
merged 6 commits into from
Apr 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2064,7 +2064,7 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
return
}
result, err := worker.ProcessTaskOverNetwork(ctx, taskQuery)
if err != nil && strings.Contains(err.Error(), worker.ErrUnservedTabletMessage) {
if err != nil && strings.Contains(err.Error(), worker.ErrNonExistentTabletMessage) {
sg.UnknownAttr = true
} else if err != nil {
rch <- err
Expand Down
15 changes: 11 additions & 4 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ func (g *groupi) ServesTabletReadOnly(key string) (bool, error) {

// Do not modify the returned Tablet
func (g *groupi) Tablet(key string) (*pb.Tablet, error) {
emptyTablet := pb.Tablet{}

// TODO: Remove all this later, create a membership state and apply it
g.RLock()
tablet, ok := g.tablets[key]
Expand All @@ -392,11 +394,16 @@ func (g *groupi) Tablet(key string) (*pb.Tablet, error) {
out, err := zc.ShouldServe(context.Background(), tablet)
if err != nil {
glog.Errorf("Error while ShouldServe grpc call %v", err)
return nil, err
return &emptyTablet, err
}

// Do not store tablets with group ID 0, as they are just dummy tablets for
// predicates that do no exist.
if out.GroupId > 0 {
g.Lock()
g.tablets[key] = out
g.Unlock()
}
g.Lock()
g.tablets[key] = out
g.Unlock()

if out.GroupId == groups().groupId() {
glog.Infof("Serving tablet for: %v\n", key)
Expand Down
7 changes: 4 additions & 3 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ import (
)

var (
ErrUnservedTabletMessage = "Tablet isn't being served by this instance"
errUnservedTablet = x.Errorf(ErrUnservedTabletMessage)
ErrNonExistentTabletMessage = "Requested predicate is not being served by any tablet"
errNonExistentTablet = x.Errorf(ErrNonExistentTabletMessage)
errUnservedTablet = x.Errorf("Tablet isn't being served by this instance")
)

func isStarAll(v []byte) bool {
Expand Down Expand Up @@ -533,7 +534,7 @@ func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, e
if gid == 0 {
span.Annotatef(nil, "state: %+v", groups().state)
span.Annotatef(nil, "Group id zero for mutation: %+v", mu)
return tctx, errUnservedTablet
return tctx, errNonExistentTablet
}
mu.StartTs = m.StartTs
go proposeOrSend(ctx, gid, mu, resCh)
Expand Down
6 changes: 4 additions & 2 deletions worker/predicate_move.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,11 @@ func (w *grpcWorker) MovePredicate(ctx context.Context,
if err := posting.Oracle().WaitForTs(ctx, in.TxnTs); err != nil {
return &emptyPayload, x.Errorf("While waiting for txn ts: %d. Error: %v", in.TxnTs, err)
}
if servesTablet, err := groups().ServesTablet(in.Predicate); err != nil {
if gid, err := groups().BelongsTo(in.Predicate); err != nil {
return &emptyPayload, err
} else if !servesTablet {
} else if gid == 0 {
return &emptyPayload, errNonExistentTablet
} else if gid != groups().groupId() {
return &emptyPayload, errUnservedTablet
}

Expand Down
4 changes: 3 additions & 1 deletion worker/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr
checkTablet := func(pred string) error {
if tablet, err := groups().Tablet(pred); err != nil {
return err
} else if tablet == nil || tablet.GroupId != groups().groupId() {
} else if tablet == nil || tablet.GroupId == 0 {
return errNonExistentTablet
} else if tablet.GroupId != groups().groupId() {
return errUnservedTablet
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions worker/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,13 @@ func populateSchema(attr string, fields []string) *api.SchemaNode {
// empty then it adds all known groups
func addToSchemaMap(schemaMap map[uint32]*pb.SchemaRequest, schema *pb.SchemaRequest) error {
for _, attr := range schema.Predicates {
gid, err := groups().BelongsTo(attr)
gid, err := groups().BelongsToReadOnly(attr)
if err != nil {
return err
}
if gid == 0 {
continue
}

s := schemaMap[gid]
if s == nil {
Expand Down Expand Up @@ -194,9 +197,6 @@ func GetSchemaOverNetwork(ctx context.Context, schema *pb.SchemaRequest) ([]*api
var schemaNodes []*api.SchemaNode

for gid, s := range schemaMap {
if gid == 0 {
return schemaNodes, errUnservedTablet
}
go getSchemaOverNetwork(ctx, gid, s, results)
}

Expand Down
15 changes: 10 additions & 5 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error
if err != nil {
return &emptyResult, err
} else if gid == 0 {
return &emptyResult, errUnservedTablet
return &emptyResult, errNonExistentTablet
}

span := otrace.FromContext(ctx)
Expand Down Expand Up @@ -734,13 +734,16 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro
// zero, then it wouldn't know that this group is no longer serving this
// predicate. There's no issue if a we are serving a particular tablet and
// we get partitioned away from group zero as long as it's not removed.
// ServesTabletReadOnly is called instead of ServesTablet to prevent this
// alpha from requesting to serve this tablet.
if servesTablet, err := groups().ServesTabletReadOnly(q.Attr); err != nil {
// BelongsToReadOnly is called instead of BelongsTo to prevent this alpha
// from requesting to serve this tablet.
if gid, err := groups().BelongsToReadOnly(q.Attr); err != nil {
return &emptyResult, err
} else if !servesTablet {
} else if gid == 0 {
return &emptyResult, errNonExistentTablet
} else if gid != groups().groupId() {
return &emptyResult, errUnservedTablet
}

var qs queryState
if q.Cache == UseTxnCache {
qs.cache = posting.Oracle().CacheAt(q.ReadTs)
Expand Down Expand Up @@ -1629,6 +1632,8 @@ func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, er
if err != nil {
return &emptyResult, err
} else if gid == 0 {
return &emptyResult, errNonExistentTablet
} else if gid != groups().groupId() {
return &emptyResult, errUnservedTablet
}

Expand Down