Skip to content

Commit

Permalink
messager: Fix a deadlock bug
Browse files Browse the repository at this point in the history
The root cause of the deadlock is that message manager calls
into tabletserver, which calls back into it.
This can cause deadlocks and Close can hang forever.

BUG=35763775
  • Loading branch information
sougou committed Feb 25, 2017
1 parent b11e9f9 commit c67d56f
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions go/vt/tabletserver/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type MessageManager struct {
}

// NewMessageManager creates a new message manager.
// Calls into tsv have to be made asynchronously. Otherwise,
// it can lead to deadlocks.
func NewMessageManager(tsv *TabletServer, table *schema.Table, conns *connpool.Pool) *MessageManager {
mm := &MessageManager{
tsv: tsv,
Expand Down Expand Up @@ -326,18 +328,18 @@ func (mm *MessageManager) send(receiver *receiverWithStatus, qr *sqltypes.Result
for i, row := range qr.Rows {
ids[i] = row[0].String()
}
// Postpone the messages for resend before discarding
// from cache. If no timely ack is received, it will be resent.
mm.postpone(ids)
// postpone should discard, but this is a safety measure
// in case it fails.
mm.cache.Discard(ids)
go postpone(mm.tsv, mm.name.String(), mm.ackWaitTime, ids)
}

func (mm *MessageManager) postpone(ids []string) {
ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.ackWaitTime)
// postpone is a non-member because it should be called asynchronously and should
// not rely on members of MessageManager.
func postpone(tsv *TabletServer, name string, ackWaitTime time.Duration, ids []string) {
ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), ackWaitTime)
defer cancel()
_, err := mm.tsv.PostponeMessages(ctx, nil, mm.name.String(), ids)
_, err := tsv.PostponeMessages(ctx, nil, name, ids)
if err != nil {
// TODO(sougou): increment internal error.
log.Errorf("Unable to postpone messages %v: %v", ids, err)
Expand Down Expand Up @@ -406,10 +408,16 @@ func (mm *MessageManager) runPoller() {
}

func (mm *MessageManager) runPurge() {
ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.purgeTicks.Interval())
go purge(mm.tsv, mm.name.String(), mm.purgeAfter, mm.purgeTicks.Interval())
}

// purge is a non-member because it should be called asynchronously and should
// not rely on members of MessageManager.
func purge(tsv *TabletServer, name string, purgeAfter, purgeInterval time.Duration) {
ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), purgeInterval)
defer cancel()
for {
count, err := mm.tsv.PurgeMessages(ctx, nil, mm.name.String(), time.Now().Add(-mm.purgeAfter).UnixNano())
count, err := tsv.PurgeMessages(ctx, nil, name, time.Now().Add(-purgeAfter).UnixNano())
if err != nil {
// TODO(sougou): increment internal error.
log.Errorf("Unable to delete messages: %v", err)
Expand Down

0 comments on commit c67d56f

Please sign in to comment.