Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix writer async close #805

Merged
merged 1 commit into from
Dec 13, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 42 additions & 36 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,10 @@ type Writer struct {
// If nil, DefaultTransport is used.
Transport RoundTripper

// Atomic flag indicating whether the writer has been closed.
closed uint32
group sync.WaitGroup

// Manages the current set of partition-topic writers.
group sync.WaitGroup
mutex sync.Mutex
closed bool
writers map[topicPartition]*partitionWriter

// writer stats are all made of atomic values, no need for synchronization.
Expand Down Expand Up @@ -505,13 +503,47 @@ func NewWriter(config WriterConfig) *Writer {
return w
}

// enter is called by WriteMessages to indicate that a new inflight operation
// has started, which helps synchronize with Close and ensure that the method
// does not return until all inflight operations were completed.
func (w *Writer) enter() bool {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.closed {
return false
}
w.group.Add(1)
return true
}

// leave is called by WriteMessages to indicate that the inflight operation has
// completed.
func (w *Writer) leave() { w.group.Done() }

// spawn starts an new asynchronous operation on the writer. This method is used
// instead of starting goroutines inline to help manage the state of the
// writer's wait group. The wait group is used to block Close calls until all
// inflight operations have completed, therefore automatically including those
// started with calls to spawn.
func (w *Writer) spawn(f func()) {
w.group.Add(1)
go func() {
defer w.group.Done()
f()
}()
}

// Close flushes pending writes, and waits for all writes to complete before
// returning. Calling Close also prevents new writes from being submitted to
// the writer, further calls to WriteMessages and the like will fail with
// io.ErrClosedPipe.
func (w *Writer) Close() error {
w.markClosed()
w.mutex.Lock()
// Marking the writer as closed here causes future calls to WriteMessages to
// fail with io.ErrClosedPipe. Mutation of this field is synchronized on the
// writer's mutex to ensure that no more increments of the wait group are
// performed afterwards (which could otherwise race with the Wait below).
w.closed = true

// close all writers to trigger any pending batches
for _, writer := range w.writers {
Expand Down Expand Up @@ -561,12 +593,10 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address")
}

w.group.Add(1)
defer w.group.Done()

if w.isClosed() {
if !w.enter() {
return io.ErrClosedPipe
}
defer w.leave()

if len(msgs) == 0 {
return nil
Expand Down Expand Up @@ -720,14 +750,6 @@ func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
return 0, UnknownTopicOrPartition
}

func (w *Writer) markClosed() {
atomic.StoreUint32(&w.closed, 1)
}

func (w *Writer) isClosed() bool {
return atomic.LoadUint32(&w.closed) != 0
}

func (w *Writer) client(timeout time.Duration) *Client {
return &Client{
Addr: w.Addr,
Expand Down Expand Up @@ -931,8 +953,6 @@ type partitionWriter struct {
mutex sync.Mutex
currBatch *writeBatch

group sync.WaitGroup

// reference to the writer that owns this batch. Used for the produce logic
// as well as stat tracking
w *Writer
Expand All @@ -944,12 +964,7 @@ func newPartitionWriter(w *Writer, key topicPartition) *partitionWriter {
queue: newBatchQueue(10),
w: w,
}
go func() {
writer.group.Add(1)
defer writer.group.Done()
writer.writeBatches()
}()

w.spawn(writer.writeBatches)
return writer
}

Expand All @@ -965,14 +980,10 @@ func (ptw *partitionWriter) writeBatches() {
}

ptw.writeBatch(batch)

}
}

func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*writeBatch][]int32 {
ptw.group.Add(1)
defer ptw.group.Done()

ptw.mutex.Lock()
defer ptw.mutex.Unlock()

Expand Down Expand Up @@ -1014,11 +1025,7 @@ func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*
// ptw.w can be accessed here because this is called with the lock ptw.mutex already held.
func (ptw *partitionWriter) newWriteBatch() *writeBatch {
batch := newWriteBatch(time.Now(), ptw.w.batchTimeout())
ptw.group.Add(1)
go func() {
defer ptw.group.Done()
ptw.awaitBatch(batch)
}()
ptw.w.spawn(func() { ptw.awaitBatch(batch) })
return batch
}

Expand Down Expand Up @@ -1145,9 +1152,8 @@ func (ptw *partitionWriter) close() {
ptw.currBatch = nil
batch.trigger()
}
ptw.queue.Close()

ptw.group.Wait()
ptw.queue.Close()
}

type writeBatch struct {
Expand Down