Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DeleteCellInfo(force=true) with downed local topo #9081

Merged
merged 3 commits into from
Oct 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions go/vt/topo/cell_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ func (ts *Server) DeleteCellInfo(ctx context.Context, cell string, force bool) e
if !force {
return vterrors.Wrap(err, "can't list SrvKeyspace entries in the cell; use -force flag to continue anyway (e.g. if cell-local topo was already permanently shut down)")
}

select {
case <-ctx.Done():
// If our context has expired and we got an error back from
// GetSrvKeyspaceNames, we assume that call failed because the
// local cell topo was down. If force=true, then we make a new
// background context to cleanup from the global topo. Otherwise a
// local-down-topo scenario would mean we never can delete it.
// (see https://github.com/vitessio/vitess/issues/8220).
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), *RemoteOperationTimeout)
defer cancel()
default:
// Context still has some time left, no need to make a new one.
}
}

filePath := pathForCellInfo(cell)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/topo/memorytopo/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (

// ListDir is part of the topo.Conn interface.
func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.DirEntry, error) {
if err := c.dial(ctx); err != nil {
return nil, err
}

c.factory.mu.Lock()
defer c.factory.mu.Unlock()

Expand Down
16 changes: 16 additions & 0 deletions go/vt/topo/memorytopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import (

// Create is part of topo.Conn interface.
func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (topo.Version, error) {
if err := c.dial(ctx); err != nil {
return nil, err
}

if contents == nil {
contents = []byte{}
}
Expand Down Expand Up @@ -61,6 +65,10 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to

// Update is part of topo.Conn interface.
func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, version topo.Version) (topo.Version, error) {
if err := c.dial(ctx); err != nil {
return nil, err
}

if contents == nil {
contents = []byte{}
}
Expand Down Expand Up @@ -125,6 +133,10 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver

// Get is part of topo.Conn interface.
func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, error) {
if err := c.dial(ctx); err != nil {
return nil, nil, err
}

c.factory.mu.Lock()
defer c.factory.mu.Unlock()

Expand All @@ -146,6 +158,10 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version,

// Delete is part of topo.Conn interface.
func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version) error {
if err := c.dial(ctx); err != nil {
return err
}

c.factory.mu.Lock()
defer c.factory.mu.Unlock()

Expand Down
4 changes: 4 additions & 0 deletions go/vt/topo/memorytopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type memoryTopoLockDescriptor struct {
// Lock is part of the topo.Conn interface.
func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
for {
if err := c.dial(ctx); err != nil {
return nil, err
}

c.factory.mu.Lock()

if c.factory.err != nil {
Expand Down
35 changes: 29 additions & 6 deletions go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ const (
electionsPath = "elections"
)

const (
// UnreachableServerAddr is a sentinel value for CellInfo.ServerAddr.
// If a memorytopo topo.Conn is created with this serverAddr then every
// method on that Conn which takes a context will simply block until the
// context finishes, and return ctx.Err(), in order to simulate an
// unreachable local cell for testing.
UnreachableServerAddr = "unreachable"
)

var (
nextWatchIndex = 0
)
Expand Down Expand Up @@ -79,8 +88,9 @@ func (f *Factory) Create(cell, serverAddr, root string) (topo.Conn, error) {
return nil, topo.NewError(topo.NoNode, cell)
}
return &Conn{
factory: f,
cell: cell,
factory: f,
cell: cell,
serverAddr: serverAddr,
}, nil
}

Expand Down Expand Up @@ -110,11 +120,24 @@ func (f *Factory) Unlock() {
f.mu.Unlock()
}

// Conn implements the topo.Conn interface. It remembers the cell, and
// points at the Factory that has all the data.
// Conn implements the topo.Conn interface. It remembers the cell and serverAddr,
// and points at the Factory that has all the data.
type Conn struct {
factory *Factory
cell string
factory *Factory
cell string
serverAddr string
}

// dial returns immediately, unless the Conn points to the sentinel
// UnreachableServerAddr, in which case it will block until the context expires
// and return the context's error.
func (c *Conn) dial(ctx context.Context) error {
if c.serverAddr == UnreachableServerAddr {
<-ctx.Done()
return ctx.Err()
}

return nil
}

// Close is part of the topo.Conn interface.
Expand Down
50 changes: 50 additions & 0 deletions go/vt/topo/topotests/cell_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -226,3 +227,52 @@ func TestExpandCells(t *testing.T) {
}
})
}

func TestDeleteCellInfo(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("zone1", "unreachable")

err := ts.UpdateCellInfoFields(ctx, "unreachable", func(ci *topodatapb.CellInfo) error {
ci.ServerAddress = memorytopo.UnreachableServerAddr
return nil
})
require.NoError(t, err, "failed to update cell to point at unreachable addr")

tests := []struct {
force bool
shouldErr bool
shouldExist bool
}{
{
force: false,
shouldErr: true,
shouldExist: true,
},
{
force: true,
shouldErr: false,
shouldExist: false,
},
}
for _, tt := range tests {
func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()

err := ts.DeleteCellInfo(ctx, "unreachable", tt.force)
if tt.shouldErr {
assert.Error(t, err, "force=%t", tt.force)
} else {
assert.NoError(t, err, "force=%t", tt.force)
}

ci, err := ts.GetCellInfo(ctx, "unreachable", true /* strongRead */)
if tt.shouldExist {
assert.NoError(t, err)
assert.NotNil(t, ci)
} else {
assert.True(t, topo.IsErrType(err, topo.NoNode), "expected cell %q to not exist", "unreachable")
}
}()
}
}