Skip to content

Commit

Permalink
[improve][client] Implement GetLastMSgID for Reader (#1087)
Browse files Browse the repository at this point in the history
### Motivation
Implement the GetLastMSgID API for Reader.

---------

Co-authored-by: Yunze Xu <xyzinfernity@163.com>
  • Loading branch information
liangyepianzhou and BewareMyPower authored Sep 13, 2023
1 parent 2a15a25 commit 7fef6a9
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pulsar/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,7 @@ type Reader interface {
// the message publish time where to reposition the subscription
//
SeekByTime(time time.Time) error

// GetLastMessageID get the last message id available for consume.
GetLastMessageID() (MessageID, error)
}
4 changes: 4 additions & 0 deletions pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,7 @@ func (r *reader) SeekByTime(time time.Time) error {

return r.pc.SeekByTime(time)
}

func (r *reader) GetLastMessageID() (MessageID, error) {
return r.pc.getLastMessageID()
}
42 changes: 42 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,3 +901,45 @@ func TestReaderWithBackoffPolicy(t *testing.T) {
partitionConsumerImp.reconnectToBroker()
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
}

func TestReaderGetLastMessageID(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
topic := newTopicName()
ctx := context.Background()
schema := NewStringSchema(nil)
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
Schema: schema,
})
assert.Nil(t, err)
defer producer.Close()

var lastMsgID MessageID
// send 10 messages
for i := 0; i < 10; i++ {
msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.NoError(t, err)
assert.NotNil(t, msgID)
lastMsgID = msgID
}

reader, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: EarliestMessageID(),
})
assert.Nil(t, err)
getLastMessageID, err := reader.GetLastMessageID()
if err != nil {
return
}

assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID())
assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID())
}

0 comments on commit 7fef6a9

Please sign in to comment.