Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] discovery: Fix tablets removed from healthcheck when topo server GetTablet call fails (#15633) #15681

Merged
merged 3 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (tw *TopologyWatcher) Stop() {

func (tw *TopologyWatcher) loadTablets() {
newTablets := make(map[string]*tabletInfo)
var partialResult bool

// First get the list of all tablets.
tabletInfos, err := tw.getTablets()
Expand All @@ -152,6 +153,7 @@ func (tw *TopologyWatcher) loadTablets() {
// If we get a partial result error, we just log it and process the tablets that we did manage to fetch.
if topo.IsErrType(err, topo.PartialResult) {
log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err)
partialResult = true
} else { // For all other errors, just return.
log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err)
return
Expand Down Expand Up @@ -183,6 +185,18 @@ func (tw *TopologyWatcher) loadTablets() {
}
}

if partialResult {
// We don't want to remove any tablets from the tablets map or the healthcheck if we got a partial result
// because we don't know if they were actually deleted or if we simply failed to fetch them.
// Fill any gaps in the newTablets map using the existing tablets.
for alias, val := range tw.tablets {
if _, ok := newTablets[alias]; !ok {
tabletAliasStrs = append(tabletAliasStrs, alias)
newTablets[alias] = val
}
}
}

for alias, newVal := range newTablets {
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) {
continue
Expand Down
82 changes: 82 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package discovery

import (
"context"
"errors"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -576,3 +577,84 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {

tw.Stop()
}

func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

ts, factory := memorytopo.NewServerAndFactory(ctx, "aa")
defer ts.Close()
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5)
defer tw.Stop()

// Force fallback to getting tablets individually.
factory.AddOperationError(memorytopo.List, ".*", topo.NewError(topo.NoImplementation, "List not supported"))

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)

// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": 123,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias)

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check the tablet is returned by GetAllTablets().
allTablets := fhc.GetAllTablets()
key1 := TabletToMapKey(tablet1)
assert.Len(t, allTablets, 1)
assert.Contains(t, allTablets, key1)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))

// Add a second tablet to the topology.
tablet2 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 2,
},
Hostname: "host2",
PortMap: map[string]int32{
"vt": 789,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet2), "CreateTablet failed for %v", tablet2.Alias)

// Cause the Get for the first tablet to fail.
factory.AddOperationError(memorytopo.Get, "tablets/aa-0000000000/Tablet", errors.New("fake error"))

// Ensure that a topo Get error results in a partial results error. If not, the rest of this test is invalid.
_, err := ts.GetTabletsByCell(ctx, "aa", &topo.GetTabletsByCellOptions{})
require.ErrorContains(t, err, "partial result")

// Now force the error during loadTablets.
tw.loadTablets()
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1})
checkChecksum(t, tw, 2762153755)

// Ensure the first tablet is still returned by GetAllTablets() and the second tablet has been added.
allTablets = fhc.GetAllTablets()
key2 := TabletToMapKey(tablet2)
assert.Len(t, allTablets, 2)
assert.Contains(t, allTablets, key1)
assert.Contains(t, allTablets, key2)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))
assert.True(t, proto.Equal(tablet2, allTablets[key2]))
}
2 changes: 1 addition & 1 deletion go/vt/topo/keyspace_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestServerGetServingShards(t *testing.T) {
require.NotNil(t, stats)

if tt.fallback {
factory.SetListError(errNoListImpl)
factory.AddOperationError(memorytopo.List, ".*", errNoListImpl)
}

err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})
Expand Down
3 changes: 3 additions & 0 deletions go/vt/topo/memorytopo/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(ListDir, dirPath); err != nil {
return nil, err

Check warning on line 43 in go/vt/topo/memorytopo/directory.go

View check run for this annotation

Codecov / codecov/patch

go/vt/topo/memorytopo/directory.go#L43

Added line #L43 was not covered by tests
}

isRoot := false
if dirPath == "" || dirPath == "/" {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
c.factory.mu.Lock()
defer c.factory.mu.Unlock()

if err := c.factory.getOperationError(NewLeaderParticipation, id); err != nil {
return nil, err

Check warning on line 39 in go/vt/topo/memorytopo/election.go

View check run for this annotation

Codecov / codecov/patch

go/vt/topo/memorytopo/election.go#L39

Added line #L39 was not covered by tests
}

// Make sure the global path exists.
electionPath := path.Join(electionsPath, name)
if n := c.factory.getOrCreatePath(c.cell, electionPath); n == nil {
Expand Down
16 changes: 14 additions & 2 deletions go/vt/topo/memorytopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(Create, filePath); err != nil {
return nil, err

Check warning on line 50 in go/vt/topo/memorytopo/file.go

View check run for this annotation

Codecov / codecov/patch

go/vt/topo/memorytopo/file.go#L50

Added line #L50 was not covered by tests
}

// Get the parent dir.
dir, file := path.Split(filePath)
Expand Down Expand Up @@ -92,6 +95,9 @@
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(Update, filePath); err != nil {
return nil, err

Check warning on line 99 in go/vt/topo/memorytopo/file.go

View check run for this annotation

Codecov / codecov/patch

go/vt/topo/memorytopo/file.go#L99

Added line #L99 was not covered by tests
}

// Get the parent dir, we'll need it in case of creation.
dir, file := path.Split(filePath)
Expand Down Expand Up @@ -168,6 +174,9 @@
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(Get, filePath); err != nil {
return nil, nil, err
}

// Get the node.
n := c.factory.nodeByPath(c.cell, filePath)
Expand Down Expand Up @@ -195,8 +204,8 @@
if c.factory.err != nil {
return nil, c.factory.err
}
if c.factory.listErr != nil {
return nil, c.factory.listErr
if err := c.factory.getOperationError(List, filePathPrefix); err != nil {
return nil, err
}

dir, file := path.Split(filePathPrefix)
Expand Down Expand Up @@ -259,6 +268,9 @@
if c.factory.err != nil {
return c.factory.err
}
if err := c.factory.getOperationError(Delete, filePath); err != nil {
return err

Check warning on line 272 in go/vt/topo/memorytopo/file.go

View check run for this annotation

Codecov / codecov/patch

go/vt/topo/memorytopo/file.go#L272

Added line #L272 was not covered by tests
}

// Get the parent dir.
dir, file := path.Split(filePath)
Expand Down
14 changes: 14 additions & 0 deletions go/vt/topo/memorytopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,27 @@
func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
c.factory.callstats.Add([]string{"TryLock"}, 1)

c.factory.mu.Lock()
err := c.factory.getOperationError(TryLock, dirPath)
c.factory.mu.Unlock()
if err != nil {
return nil, err

Check warning on line 51 in go/vt/topo/memorytopo/lock.go

View check run for this annotation

Codecov / codecov/patch

go/vt/topo/memorytopo/lock.go#L47-L51

Added lines #L47 - L51 were not covered by tests
}

return c.Lock(ctx, dirPath, contents)
}

// Lock is part of the topo.Conn interface.
func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
c.factory.callstats.Add([]string{"Lock"}, 1)

c.factory.mu.Lock()
err := c.factory.getOperationError(Lock, dirPath)
c.factory.mu.Unlock()
if err != nil {
return nil, err

Check warning on line 65 in go/vt/topo/memorytopo/lock.go

View check run for this annotation

Codecov / codecov/patch

go/vt/topo/memorytopo/lock.go#L65

Added line #L65 was not covered by tests
}

return c.lock(ctx, dirPath, contents)
}

Expand Down
57 changes: 49 additions & 8 deletions go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"errors"
"math/rand"
"regexp"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -50,6 +51,25 @@ const (
UnreachableServerAddr = "unreachable"
)

// Operation is one of the operations defined by topo.Conn
type Operation int

// The following is the list of topo.Conn operations
const (
ListDir = Operation(iota)
Create
Update
Get
List
Delete
Lock
TryLock
Watch
WatchRecursive
NewLeaderParticipation
Close
)

// Factory is a memory-based implementation of topo.Factory. It
// takes a file-system like approach, with directories at each level
// being an actual directory node. This is meant to be closer to
Expand All @@ -72,14 +92,20 @@ type Factory struct {
// err is used for testing purposes to force queries / watches
// to return the given error
err error
// listErr is used for testing purposed to fake errors from
// calls to List.
listErr error
// operationErrors is used for testing purposes to fake errors from
// operations and paths matching the spec
operationErrors map[Operation][]errorSpec
// callstats allows us to keep track of how many topo.Conn calls
// we make (Create, Get, Update, Delete, List, ListDir, etc).
callstats *stats.CountersWithMultiLabels
}

type errorSpec struct {
op Operation
pathPattern *regexp.Regexp
err error
}

// HasGlobalReadOnlyCell is part of the topo.Factory interface.
func (f *Factory) HasGlobalReadOnlyCell(serverAddr, root string) bool {
return false
Expand Down Expand Up @@ -248,9 +274,10 @@ func (n *node) PropagateWatchError(err error) {
// in case of a problem.
func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *Factory) {
f := &Factory{
cells: make(map[string]*node),
generation: uint64(rand.Int63n(1 << 60)),
callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}),
cells: make(map[string]*node),
generation: uint64(rand.Int63n(1 << 60)),
callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}),
operationErrors: make(map[Operation][]errorSpec),
}
f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil)

Expand Down Expand Up @@ -363,9 +390,23 @@ func (f *Factory) recursiveDelete(n *node) {
}
}

func (f *Factory) SetListError(err error) {
func (f *Factory) AddOperationError(op Operation, pathPattern string, err error) {
f.mu.Lock()
defer f.mu.Unlock()

f.listErr = err
f.operationErrors[op] = append(f.operationErrors[op], errorSpec{
op: op,
pathPattern: regexp.MustCompile(pathPattern),
err: err,
})
}

func (f *Factory) getOperationError(op Operation, path string) error {
specs := f.operationErrors[op]
for _, spec := range specs {
if spec.pathPattern.MatchString(path) {
return spec.err
}
}
return nil
}
6 changes: 6 additions & 0 deletions go/vt/topo/memorytopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(Watch, filePath); err != nil {
return nil, nil, err

Check warning on line 41 in go/vt/topo/memorytopo/watch.go

View check run for this annotation

Codecov / codecov/patch

go/vt/topo/memorytopo/watch.go#L41

Added line #L41 was not covered by tests
}

n := c.factory.nodeByPath(c.cell, filePath)
if n == nil {
Expand Down Expand Up @@ -89,6 +92,9 @@
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(WatchRecursive, dirpath); err != nil {
return nil, nil, err

Check warning on line 96 in go/vt/topo/memorytopo/watch.go

View check run for this annotation

Codecov / codecov/patch

go/vt/topo/memorytopo/watch.go#L96

Added line #L96 was not covered by tests
}

n := c.factory.getOrCreatePath(c.cell, dirpath)
if n == nil {
Expand Down
12 changes: 9 additions & 3 deletions go/vt/topo/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@

// GetTabletsByCell returns all the tablets in the cell.
// It returns ErrNoNode if the cell doesn't exist.
// It returns ErrPartialResult if some tablets couldn't be read. The results in the slice are incomplete.
// It returns (nil, nil) if the cell exists, but there are no tablets in it.
func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) {
// If the cell doesn't exist, this will return ErrNoNode.
Expand Down Expand Up @@ -330,6 +331,7 @@
// GetTabletsIndividuallyByCell returns a sorted list of tablets for topo servers that do not
// directly support the topoConn.List() functionality.
// It returns ErrNoNode if the cell doesn't exist.
// It returns ErrPartialResult if some tablets couldn't be read. The results in the slice are incomplete.
// It returns (nil, nil) if the cell exists, but there are no tablets in it.
func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) {
// If the cell doesn't exist, this will return ErrNoNode.
Expand All @@ -339,10 +341,14 @@
}
sort.Sort(topoproto.TabletAliasList(aliases))

var partialResultErr error
tabletMap, err := ts.GetTabletMap(ctx, aliases, opt)
if err != nil {
// we got another error than topo.ErrNoNode
return nil, err
if IsErrType(err, PartialResult) {
partialResultErr = err
} else {
return nil, err

Check warning on line 350 in go/vt/topo/tablet.go

View check run for this annotation

Codecov / codecov/patch

go/vt/topo/tablet.go#L350

Added line #L350 was not covered by tests
}
}
tablets := make([]*TabletInfo, 0, len(aliases))
for _, tabletAlias := range aliases {
Expand All @@ -356,7 +362,7 @@
}
}

return tablets, nil
return tablets, partialResultErr
}

// UpdateTablet updates the tablet data only - not associated replication paths.
Expand Down
Loading
Loading