From ee17df907aecb136165f5a79baeb96ed3938a210 Mon Sep 17 00:00:00 2001 From: Nitish Vashishtha <7208852+nitishv@users.noreply.github.com> Date: Sun, 5 Jul 2020 18:42:05 -0700 Subject: [PATCH] [Issue 304][Reader] fixed panic in CreateReader API using custom MessageID for ReaderOptions (#305) ### Motivation User of the client's client's [CreateReader](https://github.com/apache/pulsar-client-go/blob/master/pulsar/client.go#L109) API can use a custom type satisfying the [MessageID](https://github.com/apache/pulsar-client-go/blob/master/pulsar/message.go#L108) interface, when using it as a value for `StartMessageID` in [ReaderOptions](https://github.com/apache/pulsar-client-go/blob/master/pulsar/reader.go#L48) argument for the mentioned API. The current reader creation does an untested type assertion here, when preparing the `consumerOptions` needed for creating a `partitionConsumer`. https://github.com/apache/pulsar-client-go/blob/master/pulsar/reader_impl.go#L64 This assertion of `MessageID` as `*messageID` will fail unless an instance of `MessageID` is created from one of these exported APIs because `messageID` is unexported https://github.com/apache/pulsar-client-go/blob/master/pulsar/message.go#L114-#L126 Note: `newMessageID` returns `*messageID` which satisfies `MessageID` interface as well. ### Modifications Test the type assertion of `MessageID` as `*messageID`, if it fails, re-create a new `MessageID` using this https://github.com/apache/pulsar-client-go/blob/975eb3781644ebe588fc142e53eadf39fe50341a/pulsar/impl_message.go#L97 This will ensure that the custom type can be re-created as a `*messageID` which can be used by `partitionConsumerOpts` --- pulsar/reader_impl.go | 17 +++++++-- pulsar/reader_test.go | 84 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 2 deletions(-) diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 1650f7123f481..b3399dc66946b 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -44,6 +44,19 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, newError(ResultInvalidConfiguration, "StartMessageID is required") } + var startMessageID *messageID + var ok bool + if startMessageID, ok = options.StartMessageID.(*messageID); !ok { + // a custom type satisfying MessageID may not be a *messageID + // so re-create *messageID using its data + deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize()) + if err != nil { + return nil, err + } + // de-serialized MessageID is a *messageID + startMessageID = deserMsgID.(*messageID) + } + subscriptionName := options.SubscriptionRolePrefix if subscriptionName == "" { subscriptionName = "reader" @@ -61,7 +74,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { subscription: subscriptionName, subscriptionType: Exclusive, receiverQueueSize: receiverQueueSize, - startMessageID: options.StartMessageID.(*messageID), + startMessageID: startMessageID, startMessageIDInclusive: options.StartMessageIDInclusive, subscriptionMode: nonDurable, readCompacted: options.ReadCompacted, @@ -80,8 +93,8 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { if err != nil { return nil, err } - pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq) + pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq) if err != nil { close(reader.messageCh) return nil, err diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index c98771d90a0a9..d99bfcb5f7c71 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -362,3 +362,87 @@ func TestReaderHasNext(t *testing.T) { assert.Equal(t, 10, i) } + +type myMessageID struct { + data []byte +} + +func (id *myMessageID) Serialize() []byte { + return id.data +} + +func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + msgIDs := [10]MessageID{} + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + msgIDs[i] = msgID + } + + // custom start message ID + myStartMsgID := &myMessageID{ + data: msgIDs[4].Serialize(), + } + + // attempt to create reader on 5th message (not included) + var reader Reader + assert.NotPanics(t, func() { + reader, err = client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: myStartMsgID, + }) + }) + + assert.Nil(t, err) + defer reader.Close() + + // receive the remaining 5 messages + for i := 5; i < 10; i++ { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + expectMsg := fmt.Sprintf("hello-%d", i) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + } + + // create reader on 5th message (included) + readerInclusive, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: myStartMsgID, + StartMessageIDInclusive: true, + }) + + assert.Nil(t, err) + defer readerInclusive.Close() + + // receive the remaining 6 messages + for i := 4; i < 10; i++ { + msg, err := readerInclusive.Next(context.Background()) + assert.NoError(t, err) + + expectMsg := fmt.Sprintf("hello-%d", i) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + } +}