Skip to content

Commit

Permalink
[Issue 304][Reader] fixed panic in CreateReader API using custom Mess…
Browse files Browse the repository at this point in the history
…ageID for ReaderOptions (apache#305)

### Motivation

User of the client's client's [CreateReader](https://github.com/apache/pulsar-client-go/blob/master/pulsar/client.go#L109) API can use a custom type satisfying the [MessageID](https://github.com/apache/pulsar-client-go/blob/master/pulsar/message.go#L108) interface, when using it as a value for `StartMessageID` in [ReaderOptions](https://github.com/apache/pulsar-client-go/blob/master/pulsar/reader.go#L48) argument for the mentioned API.

The current reader creation does an untested type assertion here, when preparing the `consumerOptions` needed for creating a `partitionConsumer`.
https://github.com/apache/pulsar-client-go/blob/master/pulsar/reader_impl.go#L64 

This assertion of `MessageID` as `*messageID` will fail unless an instance of `MessageID` is created from one of these exported APIs because `messageID` is unexported
https://github.com/apache/pulsar-client-go/blob/master/pulsar/message.go#L114-#L126
Note: `newMessageID` returns `*messageID` which satisfies `MessageID` interface as well.


### Modifications

Test the type assertion of `MessageID` as `*messageID`, if it fails, re-create a new `MessageID` using this
https://github.com/apache/pulsar-client-go/blob/975eb3781644ebe588fc142e53eadf39fe50341a/pulsar/impl_message.go#L97
This will ensure that the custom type can be re-created as a `*messageID` which can be used by `partitionConsumerOpts`
  • Loading branch information
nitishv authored Jul 6, 2020
1 parent 975eb37 commit ee17df9
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 2 deletions.
17 changes: 15 additions & 2 deletions pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
return nil, newError(ResultInvalidConfiguration, "StartMessageID is required")
}

var startMessageID *messageID
var ok bool
if startMessageID, ok = options.StartMessageID.(*messageID); !ok {
// a custom type satisfying MessageID may not be a *messageID
// so re-create *messageID using its data
deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize())
if err != nil {
return nil, err
}
// de-serialized MessageID is a *messageID
startMessageID = deserMsgID.(*messageID)
}

subscriptionName := options.SubscriptionRolePrefix
if subscriptionName == "" {
subscriptionName = "reader"
Expand All @@ -61,7 +74,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
subscription: subscriptionName,
subscriptionType: Exclusive,
receiverQueueSize: receiverQueueSize,
startMessageID: options.StartMessageID.(*messageID),
startMessageID: startMessageID,
startMessageIDInclusive: options.StartMessageIDInclusive,
subscriptionMode: nonDurable,
readCompacted: options.ReadCompacted,
Expand All @@ -80,8 +93,8 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
if err != nil {
return nil, err
}
pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq)

pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq)
if err != nil {
close(reader.messageCh)
return nil, err
Expand Down
84 changes: 84 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,87 @@ func TestReaderHasNext(t *testing.T) {

assert.Equal(t, 10, i)
}

type myMessageID struct {
data []byte
}

func (id *myMessageID) Serialize() []byte {
return id.data
}

func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
ctx := context.Background()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
msgIDs := [10]MessageID{}
for i := 0; i < 10; i++ {
msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.NoError(t, err)
assert.NotNil(t, msgID)
msgIDs[i] = msgID
}

// custom start message ID
myStartMsgID := &myMessageID{
data: msgIDs[4].Serialize(),
}

// attempt to create reader on 5th message (not included)
var reader Reader
assert.NotPanics(t, func() {
reader, err = client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: myStartMsgID,
})
})

assert.Nil(t, err)
defer reader.Close()

// receive the remaining 5 messages
for i := 5; i < 10; i++ {
msg, err := reader.Next(context.Background())
assert.NoError(t, err)

expectMsg := fmt.Sprintf("hello-%d", i)
assert.Equal(t, []byte(expectMsg), msg.Payload())
}

// create reader on 5th message (included)
readerInclusive, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: myStartMsgID,
StartMessageIDInclusive: true,
})

assert.Nil(t, err)
defer readerInclusive.Close()

// receive the remaining 6 messages
for i := 4; i < 10; i++ {
msg, err := readerInclusive.Next(context.Background())
assert.NoError(t, err)

expectMsg := fmt.Sprintf("hello-%d", i)
assert.Equal(t, []byte(expectMsg), msg.Payload())
}
}

0 comments on commit ee17df9

Please sign in to comment.