From bf8775ec8ba58162b58603b3e5343d91b58f31e7 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Sat, 18 Feb 2023 02:12:23 +0800 Subject: [PATCH] add WriterData field to the message struct (#1059) --- message.go | 5 +++++ writer_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/message.go b/message.go index 5fb7b8ebe..c320006ba 100644 --- a/message.go +++ b/message.go @@ -20,6 +20,11 @@ type Message struct { Value []byte Headers []Header + // This field is used to hold arbitrary data you wish to include, so it + // will be available when handle it on the Writer's `Completion` method, + // this support the application can do any post operation on each message. + WriterData interface{} + // If not set at the creation, Time will be automatically set when // writing the message. Time time.Time diff --git a/writer_test.go b/writer_test.go index 988ca59e2..d34358865 100644 --- a/writer_test.go +++ b/writer_test.go @@ -174,6 +174,10 @@ func TestWriter(t *testing.T) { scenario: "test default configuration values", function: testWriterDefaults, }, + { + scenario: "test write message with writer data", + function: testWriteMessageWithWriterData, + }, } for _, test := range tests { @@ -719,6 +723,45 @@ func testWriterUnexpectedMessageTopic(t *testing.T) { } } +func testWriteMessageWithWriterData(t *testing.T) { + topic := makeTopic() + createTopic(t, topic, 1) + defer deleteTopic(t, topic) + w := newTestWriter(WriterConfig{ + Topic: topic, + Balancer: &RoundRobin{}, + }) + defer w.Close() + + index := 0 + w.Completion = func(messages []Message, err error) { + if err != nil { + t.Errorf("unexpected error %v", err) + } + + for _, msg := range messages { + meta := msg.WriterData.(int) + if index != meta { + t.Errorf("metadata is not correct, index = %d, writerData = %d", index, meta) + } + index += 1 + } + } + + msg := Message{Key: []byte("key"), Value: []byte("Hello World")} + for i := 0; i < 5; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + msg.WriterData = i + err := w.WriteMessages(ctx, msg) + if err != nil { + t.Errorf("unexpected error %v", err) + } + } + +} + func testWriterAutoCreateTopic(t *testing.T) { topic := makeTopic() // Assume it's going to get created.