From 0d77b66c357fddc3cc03f12f994430bd12adb78c Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 16 Feb 2023 17:12:11 +0800 Subject: [PATCH 1/6] add reader interceptor --- pulsar/reader.go | 21 +++++++++++++++++++++ pulsar/reader_impl.go | 18 ++++++++++++++---- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/pulsar/reader.go b/pulsar/reader.go index e4679abee2..ca5233cefa 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -92,6 +92,12 @@ type ReaderOptions struct { // BackoffPolicy parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) BackoffPolicy internal.BackoffPolicy + + AutoUpdatePartitions bool + + AutoUpdatePartitionIntervalSeconds int32 + + ReaderInterceptors []ReaderInterceptor } // Reader can be used to scan through all the messages currently available in a topic. @@ -125,3 +131,18 @@ type Reader interface { // SeekByTime(time time.Time) error } + +type ReaderInterceptor interface { + // BeforeRead called after read messages from Reader + BeforeRead(consumerMessage ConsumerMessage) +} + +type readerInterceptors []ReaderInterceptor + +func (interceptors readerInterceptors) BeforeRead(consumerMessage ConsumerMessage) { + for i := range interceptors { + interceptors[i].BeforeRead(consumerMessage) + } +} + +var defaultReaderInterceptors = make(readerInterceptors, 0) diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 079754b0bf..dac90fc797 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -40,6 +40,7 @@ type reader struct { lastMessageInBroker trackingMessageID log log.Logger metrics *internal.LeveledMetrics + interceptors readerInterceptors } func newReader(client *client, options ReaderOptions) (Reader, error) { @@ -109,11 +110,19 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { backoffPolicy: options.BackoffPolicy, } + var interceptors0 readerInterceptors + if options.ReaderInterceptors == nil { + interceptors0 = defaultReaderInterceptors + } else { + interceptors0 = options.ReaderInterceptors + } + reader := &reader{ - client: client, - messageCh: make(chan ConsumerMessage), - log: client.log.SubLogger(log.Fields{"topic": options.Topic}), - metrics: client.metrics.GetLeveledMetrics(options.Topic), + client: client, + messageCh: make(chan ConsumerMessage), + log: client.log.SubLogger(log.Fields{"topic": options.Topic}), + metrics: client.metrics.GetLeveledMetrics(options.Topic), + interceptors: interceptors0, } // Provide dummy dlq router with not dlq policy @@ -145,6 +154,7 @@ func (r *reader) Next(ctx context.Context) (Message, error) { return nil, newError(ConsumerClosed, "consumer closed") } + r.interceptors.BeforeRead(cm) // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, // it will specify the subscription position anyway msgID := cm.Message.ID() From fea08474a868fc0fdf45af22229fefdaf362fb38 Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 22 Feb 2023 01:09:38 +0800 Subject: [PATCH 2/6] add reader interceptor --- pulsar/reader.go | 15 ----------- pulsar/reader_impl.go | 19 ++++---------- pulsar/reader_interceptor.go | 49 ++++++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 29 deletions(-) create mode 100644 pulsar/reader_interceptor.go diff --git a/pulsar/reader.go b/pulsar/reader.go index ca5233cefa..9cea21e963 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -131,18 +131,3 @@ type Reader interface { // SeekByTime(time time.Time) error } - -type ReaderInterceptor interface { - // BeforeRead called after read messages from Reader - BeforeRead(consumerMessage ConsumerMessage) -} - -type readerInterceptors []ReaderInterceptor - -func (interceptors readerInterceptors) BeforeRead(consumerMessage ConsumerMessage) { - for i := range interceptors { - interceptors[i].BeforeRead(consumerMessage) - } -} - -var defaultReaderInterceptors = make(readerInterceptors, 0) diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index dac90fc797..2cd50aa4dc 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -40,7 +40,6 @@ type reader struct { lastMessageInBroker trackingMessageID log log.Logger metrics *internal.LeveledMetrics - interceptors readerInterceptors } func newReader(client *client, options ReaderOptions) (Reader, error) { @@ -108,21 +107,14 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { decryption: options.Decryption, schema: options.Schema, backoffPolicy: options.BackoffPolicy, - } - - var interceptors0 readerInterceptors - if options.ReaderInterceptors == nil { - interceptors0 = defaultReaderInterceptors - } else { - interceptors0 = options.ReaderInterceptors + interceptors: transformReaderInterceptors(options.ReaderInterceptors), } reader := &reader{ - client: client, - messageCh: make(chan ConsumerMessage), - log: client.log.SubLogger(log.Fields{"topic": options.Topic}), - metrics: client.metrics.GetLeveledMetrics(options.Topic), - interceptors: interceptors0, + client: client, + messageCh: make(chan ConsumerMessage), + log: client.log.SubLogger(log.Fields{"topic": options.Topic}), + metrics: client.metrics.GetLeveledMetrics(options.Topic), } // Provide dummy dlq router with not dlq policy @@ -154,7 +146,6 @@ func (r *reader) Next(ctx context.Context) (Message, error) { return nil, newError(ConsumerClosed, "consumer closed") } - r.interceptors.BeforeRead(cm) // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, // it will specify the subscription position anyway msgID := cm.Message.ID() diff --git a/pulsar/reader_interceptor.go b/pulsar/reader_interceptor.go new file mode 100644 index 0000000000..58077fff5a --- /dev/null +++ b/pulsar/reader_interceptor.go @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type ReaderInterceptor interface { + // BeforeRead called after messages received + BeforeRead(consumerMessage ConsumerMessage) +} + +func transformReaderInterceptors(readerInterceptors []ReaderInterceptor) ConsumerInterceptors { + if len(readerInterceptors) <= 0 { + return defaultConsumerInterceptors + } + + consumerInterceptors := make(ConsumerInterceptors, len(readerInterceptors)) + for i := range readerInterceptors { + consumerInterceptors[i] = consumerInterceptor{readerInterceptors[i]} + } + return consumerInterceptors +} + +type consumerInterceptor struct { + readerInterceptor ReaderInterceptor +} + +func (c consumerInterceptor) BeforeConsume(message ConsumerMessage) { + c.readerInterceptor.BeforeRead(message) +} + +func (c consumerInterceptor) OnAcknowledge(Consumer, MessageID) { +} + +func (c consumerInterceptor) OnNegativeAcksSend(Consumer, []MessageID) { +} From b53e8a67670887328ee227abeed751727c8e4279 Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 22 Feb 2023 01:15:11 +0800 Subject: [PATCH 3/6] add reader interceptor --- pulsar/reader.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pulsar/reader.go b/pulsar/reader.go index 9cea21e963..20b5dea587 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -93,10 +93,7 @@ type ReaderOptions struct { // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) BackoffPolicy internal.BackoffPolicy - AutoUpdatePartitions bool - - AutoUpdatePartitionIntervalSeconds int32 - + // ReaderInterceptors call before messages consumed. ReaderInterceptors []ReaderInterceptor } From 7ecf6d3eaa62efc4ab42f632a50b2492d1e3fa55 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 21 Oct 2023 19:30:11 +0800 Subject: [PATCH 4/6] Add tests --- pulsar/reader_impl.go | 2 +- pulsar/reader_test.go | 117 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 6328891d15..a514370667 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -114,7 +114,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { decryption: options.Decryption, schema: options.Schema, backoffPolicy: options.BackoffPolicy, - interceptors: transformReaderInterceptors(options.ReaderInterceptors), + interceptors: transformReaderInterceptors(options.ReaderInterceptors), maxPendingChunkedMessage: options.MaxPendingChunkedMessage, expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, autoAckIncompleteChunk: options.AutoAckIncompleteChunk, diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index ec10f8f162..38524ccdd5 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -20,6 +20,7 @@ package pulsar import ( "context" "fmt" + "sync/atomic" "testing" "time" @@ -943,3 +944,119 @@ func TestReaderGetLastMessageID(t *testing.T) { assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID()) assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID()) } + +var _ ReaderInterceptor = (*CounterReaderInterceptor)(nil) + +type CounterReaderInterceptor struct { + counter atomic.Int32 +} + +func (c *CounterReaderInterceptor) BeforeRead(_ ConsumerMessage) { + c.counter.Add(1) +} + +func TestSingleReaderInterceptor(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + schema := NewStringSchema(nil) + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + Schema: schema, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + } + + interceptor := &CounterReaderInterceptor{} + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + ReaderInterceptors: []ReaderInterceptor{interceptor}, + StartMessageID: EarliestMessageID(), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + defer r.Close() + + for { + if r.HasNext() { + _, err := r.Next(ctx) + assert.NoError(t, err) + } else { + break + } + } + + assert.Equal(t, interceptor.counter.Load(), int32(10)) +} + +func TestMultiReaderInterceptors(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + schema := NewStringSchema(nil) + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + Schema: schema, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + } + + interceptor0 := &CounterReaderInterceptor{} + interceptor1 := &CounterReaderInterceptor{} + interceptor2 := &CounterReaderInterceptor{} + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + ReaderInterceptors: []ReaderInterceptor{interceptor0, interceptor1, interceptor2}, + StartMessageID: EarliestMessageID(), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + defer r.Close() + + for { + if r.HasNext() { + _, err := r.Next(ctx) + assert.NoError(t, err) + } else { + break + } + } + + assert.Equal(t, interceptor0.counter.Load(), int32(10)) + assert.Equal(t, interceptor1.counter.Load(), int32(10)) + assert.Equal(t, interceptor2.counter.Load(), int32(10)) +} From f947c32bd12041293b69ad44858f085881ead729 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 21 Oct 2023 19:48:54 +0800 Subject: [PATCH 5/6] fix language level --- pulsar/reader_test.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 38524ccdd5..89c268127a 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -945,14 +945,19 @@ func TestReaderGetLastMessageID(t *testing.T) { assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID()) } -var _ ReaderInterceptor = (*CounterReaderInterceptor)(nil) +var _ ReaderInterceptor = (*counterReaderInterceptor)(nil) -type CounterReaderInterceptor struct { - counter atomic.Int32 +type counterReaderInterceptor struct { + v *int32 } -func (c *CounterReaderInterceptor) BeforeRead(_ ConsumerMessage) { - c.counter.Add(1) +func newInterceptor() *counterReaderInterceptor { + var v = int32(0) + return &counterReaderInterceptor{v: &v} +} + +func (c *counterReaderInterceptor) BeforeRead(_ ConsumerMessage) { + atomic.AddInt32(c.v, int32(1)) } func TestSingleReaderInterceptor(t *testing.T) { @@ -983,7 +988,7 @@ func TestSingleReaderInterceptor(t *testing.T) { assert.NotNil(t, msgID) } - interceptor := &CounterReaderInterceptor{} + interceptor := newInterceptor() r, err := client.CreateReader(ReaderOptions{ Topic: topic, ReaderInterceptors: []ReaderInterceptor{interceptor}, @@ -1003,7 +1008,7 @@ func TestSingleReaderInterceptor(t *testing.T) { } } - assert.Equal(t, interceptor.counter.Load(), int32(10)) + assert.Equal(t, atomic.LoadInt32(interceptor.v), int32(10)) } func TestMultiReaderInterceptors(t *testing.T) { @@ -1034,9 +1039,9 @@ func TestMultiReaderInterceptors(t *testing.T) { assert.NotNil(t, msgID) } - interceptor0 := &CounterReaderInterceptor{} - interceptor1 := &CounterReaderInterceptor{} - interceptor2 := &CounterReaderInterceptor{} + interceptor0 := newInterceptor() + interceptor1 := newInterceptor() + interceptor2 := newInterceptor() r, err := client.CreateReader(ReaderOptions{ Topic: topic, ReaderInterceptors: []ReaderInterceptor{interceptor0, interceptor1, interceptor2}, @@ -1056,7 +1061,7 @@ func TestMultiReaderInterceptors(t *testing.T) { } } - assert.Equal(t, interceptor0.counter.Load(), int32(10)) - assert.Equal(t, interceptor1.counter.Load(), int32(10)) - assert.Equal(t, interceptor2.counter.Load(), int32(10)) + assert.Equal(t, atomic.LoadInt32(interceptor0.v), int32(10)) + assert.Equal(t, atomic.LoadInt32(interceptor1.v), int32(10)) + assert.Equal(t, atomic.LoadInt32(interceptor2.v), int32(10)) } From bb4a21ea1d7a52c4c8980788890245e5fe57e87e Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 21 Oct 2023 19:58:05 +0800 Subject: [PATCH 6/6] fix language level --- pulsar/reader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 89c268127a..507061063f 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -1048,8 +1048,8 @@ func TestMultiReaderInterceptors(t *testing.T) { StartMessageID: EarliestMessageID(), }) - assert.NoError(t, err) assert.NotNil(t, r) + assert.NoError(t, err) defer r.Close() for {