Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: clean up failure mode handling in failover tests #93254

Merged
merged 1 commit into from
Dec 9, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 96 additions & 129 deletions pkg/cmd/roachtest/tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ import (

func registerFailover(r registry.Registry) {
for _, failureMode := range []failureMode{
&failureModeBlackhole{},
&failureModeBlackholeRecv{},
&failureModeBlackholeSend{},
&failureModeCrash{},
failureModeBlackhole,
failureModeBlackholeRecv,
failureModeBlackholeSend,
failureModeCrash,
} {
failureMode := failureMode // pin loop variable
r.Add(registry.TestSpec{
Name: fmt.Sprintf("failover/non-system/%s", failureMode),
Owner: registry.OwnerKV,
Expand All @@ -48,15 +47,6 @@ func registerFailover(r registry.Registry) {
runFailoverNonSystem(ctx, t, c, failureMode)
},
})
}

for _, failureMode := range []failureMode{
&failureModeBlackhole{},
&failureModeBlackholeRecv{},
&failureModeBlackholeSend{},
&failureModeCrash{},
} {
failureMode := failureMode // pin loop variable
r.Add(registry.TestSpec{
Name: fmt.Sprintf("failover/liveness/%s", failureMode),
Owner: registry.OwnerKV,
Expand All @@ -66,15 +56,6 @@ func registerFailover(r registry.Registry) {
runFailoverLiveness(ctx, t, c, failureMode)
},
})
}

for _, failureMode := range []failureMode{
&failureModeBlackhole{},
&failureModeBlackholeRecv{},
&failureModeBlackholeSend{},
&failureModeCrash{},
} {
failureMode := failureMode // pin loop variable
r.Add(registry.TestSpec{
Name: fmt.Sprintf("failover/system-non-liveness/%s", failureMode),
Owner: registry.OwnerKV,
Expand Down Expand Up @@ -124,14 +105,10 @@ func runFailoverNonSystem(
// Create cluster.
opts := option.DefaultStartOpts()
settings := install.MakeClusterSettings()
failer := makeFailer(t, failureMode, opts, settings)
c.Put(ctx, t.Cockroach(), "./cockroach")
c.Start(ctx, t.L(), opts, settings, c.Range(1, 6))

if f, ok := failureMode.(*failureModeCrash); ok {
f.startOpts = opts
f.startSettings = settings
}

conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()

Expand Down Expand Up @@ -186,7 +163,7 @@ func runFailoverNonSystem(
})

// Start a worker to fail and recover n4-n6 in order.
defer failureMode.Cleanup(ctx, t, c)
defer failer.Cleanup(ctx, t, c)

m.Go(func(ctx context.Context) error {
var raftCfg base.RaftConfig
Expand Down Expand Up @@ -219,10 +196,10 @@ func runFailoverNonSystem(
}

t.Status(fmt.Sprintf("failing n%d (%s)", node, failureMode))
if failureMode.ExpectDeath() {
if failer.ExpectDeath() {
m.ExpectDeath()
}
failureMode.Fail(ctx, t, c, node)
failer.Fail(ctx, t, c, node)

select {
case <-ticker.C:
Expand All @@ -231,7 +208,7 @@ func runFailoverNonSystem(
}

t.Status(fmt.Sprintf("recovering n%d (%s)", node, failureMode))
failureMode.Recover(ctx, t, c, node)
failer.Recover(ctx, t, c, node)
}
}
return nil
Expand Down Expand Up @@ -282,14 +259,10 @@ func runFailoverLiveness(
// Create cluster.
opts := option.DefaultStartOpts()
settings := install.MakeClusterSettings()
failer := makeFailer(t, failureMode, opts, settings)
c.Put(ctx, t.Cockroach(), "./cockroach")
c.Start(ctx, t.L(), opts, settings, c.Range(1, 4))

if f, ok := failureMode.(*failureModeCrash); ok {
f.startOpts = opts
f.startSettings = settings
}

conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()

Expand Down Expand Up @@ -370,7 +343,7 @@ func runFailoverLiveness(
startTime := timeutil.Now()

// Start a worker to fail and recover n4.
defer failureMode.Cleanup(ctx, t, c)
defer failer.Cleanup(ctx, t, c)

m.Go(func(ctx context.Context) error {
var raftCfg base.RaftConfig
Expand Down Expand Up @@ -402,10 +375,10 @@ func runFailoverLiveness(
}

t.Status(fmt.Sprintf("failing n%d (%s)", 4, failureMode))
if failureMode.ExpectDeath() {
if failer.ExpectDeath() {
m.ExpectDeath()
}
failureMode.Fail(ctx, t, c, 4)
failer.Fail(ctx, t, c, 4)

select {
case <-ticker.C:
Expand All @@ -414,7 +387,7 @@ func runFailoverLiveness(
}

t.Status(fmt.Sprintf("recovering n%d (%s)", 4, failureMode))
failureMode.Recover(ctx, t, c, 4)
failer.Recover(ctx, t, c, 4)
relocateLeases(t, ctx, conn, `range_id = 2`, 4)
}
return nil
Expand Down Expand Up @@ -485,14 +458,10 @@ func runFailoverSystemNonLiveness(
// Create cluster.
opts := option.DefaultStartOpts()
settings := install.MakeClusterSettings()
failer := makeFailer(t, failureMode, opts, settings)
c.Put(ctx, t.Cockroach(), "./cockroach")
c.Start(ctx, t.L(), opts, settings, c.Range(1, 6))

if f, ok := failureMode.(*failureModeCrash); ok {
f.startOpts = opts
f.startSettings = settings
}

conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()

Expand Down Expand Up @@ -555,7 +524,7 @@ func runFailoverSystemNonLiveness(
})

// Start a worker to fail and recover n4-n6 in order.
defer failureMode.Cleanup(ctx, t, c)
defer failer.Cleanup(ctx, t, c)

m.Go(func(ctx context.Context) error {
var raftCfg base.RaftConfig
Expand Down Expand Up @@ -590,10 +559,10 @@ func runFailoverSystemNonLiveness(
}

t.Status(fmt.Sprintf("failing n%d (%s)", node, failureMode))
if failureMode.ExpectDeath() {
if failer.ExpectDeath() {
m.ExpectDeath()
}
failureMode.Fail(ctx, t, c, node)
failer.Fail(ctx, t, c, node)

select {
case <-ticker.C:
Expand All @@ -602,18 +571,55 @@ func runFailoverSystemNonLiveness(
}

t.Status(fmt.Sprintf("recovering n%d (%s)", node, failureMode))
failureMode.Recover(ctx, t, c, node)
failer.Recover(ctx, t, c, node)
}
}
return nil
})
m.Wait()
}

// failureMode fails and recovers a given node in some particular way.
type failureMode interface {
fmt.Stringer
// failureMode specifies a failure mode.
type failureMode string

const (
failureModeCrash failureMode = "crash"
failureModeBlackhole failureMode = "blackhole"
failureModeBlackholeRecv failureMode = "blackhole-recv"
failureModeBlackholeSend failureMode = "blackhole-send"
)

// makeFailer creates a new failer for the given failureMode.
func makeFailer(
t test.Test, failureMode failureMode, opts option.StartOpts, settings install.ClusterSettings,
) failer {
switch failureMode {
case failureModeCrash:
return &crashFailer{
startOpts: opts,
startSettings: settings,
}
case failureModeBlackhole:
return &blackholeFailer{
input: true,
output: true,
}
case failureModeBlackholeRecv:
return &blackholeFailer{
input: true,
}
case failureModeBlackholeSend:
return &blackholeFailer{
output: true,
}
default:
t.Fatalf("unknown failure mode %s", failureMode)
return nil
}
}

// failer fails and recovers a given node in some particular way.
type failer interface {
// Fail fails the given node.
Fail(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int)

Expand All @@ -628,99 +634,62 @@ type failureMode interface {
ExpectDeath() bool
}

// failureModeCrash is a process crash where the TCP/IP stack remains responsive
// crashFailer is a process crash where the TCP/IP stack remains responsive
// and sends immediate RST packets to peers.
type failureModeCrash struct {
type crashFailer struct {
startOpts option.StartOpts
startSettings install.ClusterSettings
}

func (f *failureModeCrash) String() string { return "crash" }
func (f *failureModeCrash) ExpectDeath() bool { return true }
func (f *crashFailer) ExpectDeath() bool { return true }

func (f *failureModeCrash) Fail(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int) {
func (f *crashFailer) Fail(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int) {
c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.Node(nodeID)) // uses SIGKILL
}

func (f *failureModeCrash) Recover(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
func (f *crashFailer) Recover(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int) {
c.Start(ctx, t.L(), f.startOpts, f.startSettings, c.Node(nodeID))
}

func (f *failureModeCrash) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
func (f *crashFailer) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
}

// failureModeBlackhole is a network outage where all inbound and outbound
// TCP/IP packets to/from port 26257 are dropped, causing network hangs and
// timeouts.
type failureModeBlackhole struct{}

func (f *failureModeBlackhole) String() string { return "blackhole" }
func (f *failureModeBlackhole) ExpectDeath() bool { return false }

func (f *failureModeBlackhole) Fail(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A INPUT -m multiport -p tcp --ports 26257 -j DROP`)
c.Run(ctx, c.Node(nodeID), `sudo iptables -A OUTPUT -m multiport -p tcp --ports 26257 -j DROP`)
}

func (f *failureModeBlackhole) Recover(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -F`)
}

func (f *failureModeBlackhole) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Run(ctx, c.All(), `sudo iptables -F`)
}

// failureModeBlackholeRecv is an asymmetric network outage where all inbound
// TCP/IP packets to port 26257 are dropped, causing network hangs and timeouts.
// The node can still send traffic on outbound connections.
type failureModeBlackholeRecv struct{}

func (f *failureModeBlackholeRecv) String() string { return "blackhole-recv" }
func (f *failureModeBlackholeRecv) ExpectDeath() bool { return false }

func (f *failureModeBlackholeRecv) Fail(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A INPUT -p tcp --dport 26257 -j DROP`)
}

func (f *failureModeBlackholeRecv) Recover(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -F`)
}

func (f *failureModeBlackholeRecv) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Run(ctx, c.All(), `sudo iptables -F`)
// blackholeFailer causes a network failure where TCP/IP packets to/from port
// 26257 are dropped, causing network hangs and timeouts.
//
// If only one if input or output are enabled, connections in that direction
// will fail (even already established connections), but connections in the
// other direction are still functional (including responses).
type blackholeFailer struct {
input bool
output bool
}

// failureModeBlackholeSend is an asymmetric network outage where all outbound
// TCP/IP packets to port 26257 are dropped, causing network hangs and
// timeouts. The node can still receive traffic on inbound connections.
type failureModeBlackholeSend struct{}

func (f *failureModeBlackholeSend) String() string { return "blackhole-send" }
func (f *failureModeBlackholeSend) ExpectDeath() bool { return false }

func (f *failureModeBlackholeSend) Fail(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A OUTPUT -p tcp --dport 26257 -j DROP`)
func (f *blackholeFailer) ExpectDeath() bool { return false }

func (f *blackholeFailer) Fail(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int) {
// When dropping both input and output, we use multiport to block traffic both
// to port 26257 and from port 26257 on either side of the connection, to
// avoid any spurious packets from making it through.
//
// We don't do this when only blocking in one direction, because e.g. in the
// input case we don't want inbound connections to work (INPUT to 26257), but
// we do want responses for outbound connections to work (INPUT from 26257).
if f.input && f.output {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A INPUT -m multiport -p tcp --ports 26257 -j DROP`)
c.Run(ctx, c.Node(nodeID), `sudo iptables -A OUTPUT -m multiport -p tcp --ports 26257 -j DROP`)
} else if f.input {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A INPUT -p tcp --dport 26257 -j DROP`)
} else if f.output {
c.Run(ctx, c.Node(nodeID), `sudo iptables -A OUTPUT -p tcp --dport 26257 -j DROP`)
}
}

func (f *failureModeBlackholeSend) Recover(
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
) {
func (f *blackholeFailer) Recover(ctx context.Context, t test.Test, c cluster.Cluster, nodeID int) {
c.Run(ctx, c.Node(nodeID), `sudo iptables -F`)
}

func (f *failureModeBlackholeSend) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
func (f *blackholeFailer) Cleanup(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Run(ctx, c.All(), `sudo iptables -F`)
}

Expand All @@ -743,8 +712,7 @@ func relocateRanges(
t.Status(fmt.Sprintf("moving %d ranges off of n%d (%s)", count, source, predicate))
for _, target := range to {
_, err := conn.ExecContext(ctx, `ALTER RANGE RELOCATE FROM $1::int TO $2::int FOR `+
`SELECT range_id FROM crdb_internal.ranges WHERE `+where,
source, target)
`SELECT range_id FROM crdb_internal.ranges WHERE `+where, source, target)
require.NoError(t, err)
}
time.Sleep(time.Second)
Expand All @@ -766,8 +734,7 @@ func relocateLeases(t test.Test, ctx context.Context, conn *gosql.DB, predicate
}
t.Status(fmt.Sprintf("moving %d leases to n%d (%s)", count, to, predicate))
_, err := conn.ExecContext(ctx, `ALTER RANGE RELOCATE LEASE TO $1::int FOR `+
`SELECT range_id FROM crdb_internal.ranges WHERE `+where,
to)
`SELECT range_id FROM crdb_internal.ranges WHERE `+where, to)
// When a node recovers, it may not have gossiped its store key yet.
if err != nil && !strings.Contains(err.Error(), "KeyNotPresentError") {
require.NoError(t, err)
Expand Down