Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit tests for wrangler version of TabletExternallyReparented #5292

Merged
merged 3 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions go/vt/vttablet/tabletmanager/action_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,19 @@ type ActionAgent struct {
// It's only set once in NewActionAgent() and never modified after that.
orc *orcClient

// shardSyncChan is a channel for informing the shard sync goroutine that
// mutex protects all the following fields (that start with '_'),
// only hold the mutex to update the fields, nothing else.
mutex sync.Mutex

// _shardSyncChan is a channel for informing the shard sync goroutine that
// it should wake up and recheck the tablet state, to make sure it and the
// shard record are in sync.
//
// Call agent.notifyShardSync() instead of sending directly to this channel.
shardSyncChan chan struct{}
_shardSyncChan chan struct{}

// shardSyncCancel is the function to stop the background shard sync goroutine.
shardSyncCancel context.CancelFunc

// mutex protects all the following fields (that start with '_'),
// only hold the mutex to update the fields, nothing else.
mutex sync.Mutex
// _shardSyncCancel is the function to stop the background shard sync goroutine.
_shardSyncCancel context.CancelFunc

// _tablet has the Tablet record we last read from the topology server.
_tablet *topodatapb.Tablet
Expand Down
27 changes: 18 additions & 9 deletions go/vt/vttablet/tabletmanager/shard_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ var (
// a notification signal from setTablet().
func (agent *ActionAgent) shardSyncLoop(ctx context.Context) {
// Make a copy of the channel so we don't race when stopShardSync() clears it.
notifyChan := agent.shardSyncChan
agent.mutex.Lock()
notifyChan := agent._shardSyncChan
agent.mutex.Unlock()

// retryChan is how we wake up after going to sleep between retries.
// If no retry is pending, this channel will be nil, which means it's fine
Expand Down Expand Up @@ -224,9 +226,11 @@ func (agent *ActionAgent) startShardSync() {
// even if the receiver is busy. We can drop any additional send attempts
// if the buffer is full because all we care about is that the receiver will
// be told it needs to recheck the state.
agent.shardSyncChan = make(chan struct{}, 1)
agent.mutex.Lock()
agent._shardSyncChan = make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
agent.shardSyncCancel = cancel
agent._shardSyncCancel = cancel
agent.mutex.Unlock()

// Queue up a pending notification to force the loop to run once at startup.
agent.notifyShardSync()
Expand All @@ -236,23 +240,28 @@ func (agent *ActionAgent) startShardSync() {
}

func (agent *ActionAgent) stopShardSync() {
if agent.shardSyncCancel != nil {
agent.shardSyncCancel()
agent.shardSyncCancel = nil
agent.shardSyncChan = nil
agent.mutex.Lock()
if agent._shardSyncCancel != nil {
agent._shardSyncCancel()
agent._shardSyncCancel = nil
agent._shardSyncChan = nil
}
agent.mutex.Unlock()
}

func (agent *ActionAgent) notifyShardSync() {
// If this is called before the shard sync is started, do nothing.
if agent.shardSyncChan == nil {
agent.mutex.Lock()
defer agent.mutex.Unlock()

if agent._shardSyncChan == nil {
return
}

// Try to send. If the channel buffer is full, it means a notification is
// already pending, so we don't need to do anything.
select {
case agent.shardSyncChan <- struct{}{}:
case agent._shardSyncChan <- struct{}{}:
default:
}
}
Expand Down
Loading