diff --git a/go/vt/tabletserver/message_manager.go b/go/vt/tabletserver/message_manager.go index 7956890b7d8..0074e627103 100644 --- a/go/vt/tabletserver/message_manager.go +++ b/go/vt/tabletserver/message_manager.go @@ -107,6 +107,8 @@ type MessageManager struct { } // NewMessageManager creates a new message manager. +// Calls into tsv have to be made asynchronously. Otherwise, +// it can lead to deadlocks. func NewMessageManager(tsv *TabletServer, table *schema.Table, conns *connpool.Pool) *MessageManager { mm := &MessageManager{ tsv: tsv, @@ -326,18 +328,18 @@ func (mm *MessageManager) send(receiver *receiverWithStatus, qr *sqltypes.Result for i, row := range qr.Rows { ids[i] = row[0].String() } - // Postpone the messages for resend before discarding - // from cache. If no timely ack is received, it will be resent. - mm.postpone(ids) // postpone should discard, but this is a safety measure // in case it fails. mm.cache.Discard(ids) + go postpone(mm.tsv, mm.name.String(), mm.ackWaitTime, ids) } -func (mm *MessageManager) postpone(ids []string) { - ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.ackWaitTime) +// postpone is a non-member because it should be called asynchronously and should +// not rely on members of MessageManager. +func postpone(tsv *TabletServer, name string, ackWaitTime time.Duration, ids []string) { + ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), ackWaitTime) defer cancel() - _, err := mm.tsv.PostponeMessages(ctx, nil, mm.name.String(), ids) + _, err := tsv.PostponeMessages(ctx, nil, name, ids) if err != nil { // TODO(sougou): increment internal error. log.Errorf("Unable to postpone messages %v: %v", ids, err) @@ -406,10 +408,16 @@ func (mm *MessageManager) runPoller() { } func (mm *MessageManager) runPurge() { - ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.purgeTicks.Interval()) + go purge(mm.tsv, mm.name.String(), mm.purgeAfter, mm.purgeTicks.Interval()) +} + +// purge is a non-member because it should be called asynchronously and should +// not rely on members of MessageManager. +func purge(tsv *TabletServer, name string, purgeAfter, purgeInterval time.Duration) { + ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), purgeInterval) defer cancel() for { - count, err := mm.tsv.PurgeMessages(ctx, nil, mm.name.String(), time.Now().Add(-mm.purgeAfter).UnixNano()) + count, err := tsv.PurgeMessages(ctx, nil, name, time.Now().Add(-purgeAfter).UnixNano()) if err != nil { // TODO(sougou): increment internal error. log.Errorf("Unable to delete messages: %v", err)