Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: pool-related test flakyness #14076

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
Expand Down Expand Up @@ -189,18 +190,20 @@ func (shard *Shard) Replica() *Vttablet {
return nil
}

// CtrlCHandler handles the teardown for the ctrl-c.
func (cluster *LocalProcessCluster) CtrlCHandler() {
// SetupCtrlCHandler handles the teardown for the ctrl-c.
func (cluster *LocalProcessCluster) SetupCtrlCHandler() {
cluster.Context, cluster.CancelFunc = context.WithCancel(context.Background())

c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
select {
case <-c:
cluster.Teardown()
os.Exit(0)
case <-cluster.Done():
}
go func() {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
select {
case <-c:
cluster.Teardown()
os.Exit(0)
case <-cluster.Done():
}
}()
}

// StartTopo starts topology server
Expand Down Expand Up @@ -715,7 +718,7 @@ func (cluster *LocalProcessCluster) NewVtgateInstance() *VtgateProcess {
// NewBareCluster instantiates a new cluster and does not assume existence of any of the vitess processes
func NewBareCluster(cell string, hostname string) *LocalProcessCluster {
cluster := &LocalProcessCluster{Cell: cell, Hostname: hostname, mx: new(sync.Mutex), DefaultCharset: "utf8mb4"}
go cluster.CtrlCHandler()
cluster.SetupCtrlCHandler()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆


cluster.OriginalVTDATAROOT = os.Getenv("VTDATAROOT")
cluster.CurrentVTDATAROOT = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("vtroot_%d", cluster.GetAndReservePort()))
Expand Down Expand Up @@ -941,28 +944,28 @@ func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context,
return err
}

conditionSuccess := false
timeoutExceeded := false
var conditionSuccess atomic.Bool
var timeoutExceeded atomic.Bool
go func() {
time.Sleep(timeout)
timeoutExceeded = true
timeoutExceeded.Store(true)
}()

err = conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
if condition(shr) {
conditionSuccess = true
conditionSuccess.Store(true)
}
if timeoutExceeded || conditionSuccess {
if timeoutExceeded.Load() || conditionSuccess.Load() {
return io.EOF
}
return nil
})

if conditionSuccess {
if conditionSuccess.Load() {
return nil
}

if timeoutExceeded {
if timeoutExceeded.Load() {
return errors.New("timeout exceed while waiting for the condition in StreamHealth")
}
return err
Expand Down
24 changes: 14 additions & 10 deletions go/test/endtoend/tabletmanager/tablet_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -250,17 +251,19 @@ func TestHealthCheckSchemaChangeSignal(t *testing.T) {

func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn, primaryTablet *cluster.Vttablet, viewsEnabled bool) {
var streamErr error
wg := sync.WaitGroup{}
var wg sync.WaitGroup
var ranOnce atomic.Bool
var finished atomic.Bool

wg.Add(1)
ranOnce := false
finished := false
ch := make(chan *querypb.StreamHealthResponse)

go func() {
defer wg.Done()
streamErr = clusterInstance.StreamTabletHealthUntil(context.Background(), primaryTablet, 30*time.Second, func(shr *querypb.StreamHealthResponse) bool {
ranOnce = true
ranOnce.Store(true)
// If we are finished, then close the channel and end the stream.
if finished {
if finished.Load() {
close(ch)
return true
}
Expand All @@ -272,13 +275,14 @@ func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn,
// The test becomes flaky if we run the DDL immediately after starting the above go routine because the client for the Stream
// sometimes isn't registered by the time DDL runs, and it misses the update we get. To prevent this situation, we wait for one Stream packet
// to have returned. Once we know we received a Stream packet, then we know that we are registered for the health stream and can execute the DDL.
for i := 0; i < 30; i++ {
if ranOnce {
break
}
for i := 0; i < 30 && !ranOnce.Load(); i++ {
time.Sleep(1 * time.Second)
}

if !ranOnce.Load() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick of nitpicks - github.com/stretchr/testify are our friends.

require.True(ranOnce.Load(), "HealthCheck did not ran?")

t.Fatalf("HealthCheck did not ran?")
}

verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area` (`id` int NOT NULL, `country` varchar(30), PRIMARY KEY (`id`))", "area")
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area2` (`id` int NOT NULL, PRIMARY KEY (`id`))", "area2")
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE VIEW v2 as select * from t1", viewsEnabled)
Expand All @@ -288,7 +292,7 @@ func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn,
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP VIEW v2", viewsEnabled)
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP TABLE `area`", "area")

finished = true
finished.Store(true)
wg.Wait()
require.NoError(t, streamErr)
}
Expand Down