Skip to content

Commit

Permalink
Add some test cases for project (apache#58)
Browse files Browse the repository at this point in the history
### Motivation

- Add some test cases as follows:

    - TestConsumer_EventTime
    - TestNonPersistentTopic
    - TestConsumer_Flow

- Fix consumer connection closed
- Add `pprof` for debug project
- Fix `flow` command logic
  • Loading branch information
wolfstudy authored and jiazhai committed Aug 22, 2019
1 parent 49b7a3c commit de147fa
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 37 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@

# Output of the go coverage tool
*.out

perf/perf
16 changes: 16 additions & 0 deletions perf/pulsar-perf-go.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
package main

import (
"fmt"
"net"
"net/http"
_ "net/http/pprof"

"github.com/spf13/cobra"

log "github.com/sirupsen/logrus"
Expand All @@ -30,6 +35,17 @@ type ClientArgs struct {
var clientArgs ClientArgs

func main() {
// use `go tool pprof http://localhost:3000/debug/pprof/profile` to get pprof file(cpu info)
// use `go tool pprof http://localhost:3000/debug/pprof/heap` to get inuse_space file
go func() {
listenAddr := net.JoinHostPort("localhost", "3000")
fmt.Printf("Profile server listening on %s\n", listenAddr)
profileRedirect := http.RedirectHandler("/debug/pprof", http.StatusSeeOther)
http.Handle("/", profileRedirect)
err := fmt.Errorf("%v", http.ListenAndServe(listenAddr, nil))
fmt.Println(err.Error())
}()

log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
TimestampFormat: "15:04:05.000",
Expand Down
75 changes: 75 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,3 +790,78 @@ func TestConsumer_Seek(t *testing.T) {
t.Logf("again received message:%+v", msg.ID())
assert.Equal(t, "msg-content-4", string(msg.Payload()))
}

func TestConsumer_EventTime(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topicName := "test-event-time"
ctx := context.Background()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
})
assert.Nil(t, err)
defer consumer.Close()

et := timeFromUnixTimestampMillis(uint64(5))
err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("test"),
EventTime: &et,
})
assert.Nil(t, err)

msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, et, msg.EventTime())
assert.Equal(t, "test", string(msg.Payload()))
}

func TestConsumer_Flow(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topicName := "test-received-since-flow"
ctx := context.Background()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
ReceiverQueueSize: 4,
})

for msgNum := 0; msgNum < 100; msgNum++ {
if err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
}); err != nil {
t.Fatal(err)
}
}

for msgNum := 0; msgNum < 100; msgNum++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", msgNum), string(msg.Payload()))
}
}
5 changes: 4 additions & 1 deletion pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ const (
//BrokerMetadataError Result = 10 // Broker failed in updating metadata
//BrokerPersistenceError Result = 11 // Broker failed to persist entry
//ChecksumError Result = 12 // Corrupt message checksum failure
//ConsumerBusy Result = 13 // Exclusive consumer is already connected
// ConsumerBusy means Exclusive consumer is already connected
ConsumerBusy Result = 13
//NotConnectedError Result = 14 // Producer/Consumer is not currently connected to broker
//AlreadyClosedError Result = 15 // Producer/Consumer is already closed and not accepting any operation
//InvalidMessage Result = 16 // Error in publishing an already used message
Expand Down Expand Up @@ -104,6 +105,8 @@ func getResultStr(r Result) string {
return "InvalidTopicName"
case ResultConnectError:
return "ConnectError"
case ConsumerBusy:
return "ConsumerBusy"
default:
return fmt.Sprintf("Result(%d)", r)
}
Expand Down
61 changes: 33 additions & 28 deletions pulsar/impl_partition_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,27 @@ type partitionConsumer struct {
omu sync.Mutex // protects following
redeliverMessages []*pb.MessageIdData

unAckTracker *UnackedMessageTracker
unAckTracker *UnackedMessageTracker
receivedSinceFlow uint32

eventsChan chan interface{}
partitionIdx int
partitionNum int
}

func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID, partitionNum int, ch chan ConsumerMessage) (*partitionConsumer, error) {
func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID, partitionNum int, ch chan<- ConsumerMessage) (*partitionConsumer, error) {
c := &partitionConsumer{
state: consumerInit,
client: client,
topic: topic,
options: options,
log: log.WithField("topic", topic),
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: partitionID,
partitionNum: partitionNum,
eventsChan: make(chan interface{}, 1),
subQueue: make(chan ConsumerMessage, options.ReceiverQueueSize),
state: consumerInit,
client: client,
topic: topic,
options: options,
log: log.WithField("topic", topic),
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: partitionID,
partitionNum: partitionNum,
eventsChan: make(chan interface{}, 1),
subQueue: make(chan ConsumerMessage, options.ReceiverQueueSize),
receivedSinceFlow: 0,
}

c.setDefault(options)
Expand Down Expand Up @@ -167,7 +169,7 @@ func (pc *partitionConsumer) grabCnx() error {
return err
}

pc.log.Infof("Lookup result: %v", lr)
pc.log.Debugf("Lookup result: %v", lr)
requestID := pc.client.rpcClient.NewRequestID()
res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_SUBSCRIBE, &pb.CommandSubscribe{
Expand Down Expand Up @@ -262,26 +264,33 @@ func (pc *partitionConsumer) trackMessage(msgID MessageID) error {
return nil
}

func (pc *partitionConsumer) increaseAvailablePermits(receivedSinceFlow uint32) error {
highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 1))
if receivedSinceFlow >= highwater {
if err := pc.internalFlow(receivedSinceFlow); err != nil {
pc.log.Errorf("Send Flow cmd error:%s", err.Error())
func (pc *partitionConsumer) increaseAvailablePermits() error {
pc.receivedSinceFlow++
highWater := uint32(math.Max(float64(pc.options.ReceiverQueueSize/2), 1))

pc.log.Debugf("receivedSinceFlow size is: %d, highWater size is: %d", pc.receivedSinceFlow, highWater)

// send flow request after 1/2 of the queue has been consumed
if pc.receivedSinceFlow >= highWater {
pc.log.Debugf("send flow command to broker, permits size is: %d", pc.receivedSinceFlow)
err := pc.internalFlow(pc.receivedSinceFlow)
if err != nil {
pc.log.Errorf("Send flow cmd error:%s", err.Error())
pc.receivedSinceFlow = 0
return err
}
receivedSinceFlow = 0
pc.receivedSinceFlow = 0
}
return nil
}

func (pc *partitionConsumer) messageProcessed(msgID MessageID, receivedSinceFlow uint32) error {
func (pc *partitionConsumer) messageProcessed(msgID MessageID) error {
err := pc.trackMessage(msgID)
if err != nil {
return err
}
receivedSinceFlow++

err = pc.increaseAvailablePermits(receivedSinceFlow)
err = pc.increaseAvailablePermits()
if err != nil {
return err
}
Expand All @@ -303,19 +312,16 @@ func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, err
}

func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
var receivedSinceFlow uint32

for {
select {
case tmpMsg, ok := <-pc.subQueue:
if ok {
msgs <- tmpMsg

err := pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow)
err := pc.messageProcessed(tmpMsg.ID())
if err != nil {
return err
}
receivedSinceFlow = 0
continue
}
break
Expand All @@ -326,13 +332,12 @@ func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- Consu
}

func (pc *partitionConsumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) {
var receivedSinceFlow uint32
var err error

select {
case tmpMsg, ok := <-pc.subQueue:
if ok {
err = pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow)
err = pc.messageProcessed(tmpMsg.ID())
callback(tmpMsg.Message, err)
if err != nil {
pc.log.Errorf("processed messages error:%s", err.Error())
Expand Down
26 changes: 18 additions & 8 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,30 +314,36 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by

switch *cmd.Type {
case pb.BaseCommand_SUCCESS:
c.handleResponse(*cmd.Success.RequestId, cmd)
c.handleResponse(cmd.Success.GetRequestId(), cmd)

case pb.BaseCommand_PRODUCER_SUCCESS:
c.handleResponse(*cmd.ProducerSuccess.RequestId, cmd)
c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)

case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
c.handleResponse(*cmd.PartitionMetadataResponse.RequestId, cmd)
c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), cmd)

case pb.BaseCommand_LOOKUP_RESPONSE:
c.handleResponse(*cmd.LookupTopicResponse.RequestId, cmd)
lookupResult := cmd.LookupTopicResponse
c.handleResponse(lookupResult.GetRequestId(), cmd)

case pb.BaseCommand_CONSUMER_STATS_RESPONSE:
c.handleResponse(*cmd.ConsumerStatsResponse.RequestId, cmd)
c.handleResponse(cmd.ConsumerStatsResponse.GetRequestId(), cmd)

case pb.BaseCommand_GET_LAST_MESSAGE_ID_RESPONSE:
c.handleResponse(*cmd.GetLastMessageIdResponse.RequestId, cmd)
c.handleResponse(cmd.GetLastMessageIdResponse.GetRequestId(), cmd)

case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE_RESPONSE:
c.handleResponse(*cmd.GetTopicsOfNamespaceResponse.RequestId, cmd)
c.handleResponse(cmd.GetTopicsOfNamespaceResponse.GetRequestId(), cmd)

case pb.BaseCommand_GET_SCHEMA_RESPONSE:
c.handleResponse(*cmd.GetSchemaResponse.RequestId, cmd)
c.handleResponse(cmd.GetSchemaResponse.GetRequestId(), cmd)

case pb.BaseCommand_ERROR:
if cmd.Error != nil {
c.log.Errorf("Error: %s, Error Message: %s", cmd.Error.GetError(), cmd.Error.GetMessage())
c.Close()
return
}
case pb.BaseCommand_CLOSE_PRODUCER:
c.handleCloseProducer(cmd.GetCloseProducer())
case pb.BaseCommand_CLOSE_CONSUMER:
Expand Down Expand Up @@ -501,6 +507,10 @@ func (c *connection) Close() {
for _, listener := range c.listeners {
listener.ConnectionClosed()
}

for _, cnx := range c.connWrapper.Consumers {
cnx.ConnectionClosed()
}
}

func (c *connection) changeState(state connectionState) {
Expand Down
23 changes: 23 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,3 +486,26 @@ func TestMessageRouter(t *testing.T) {
assert.NotNil(t, msg)
assert.Equal(t, string(msg.Payload()), "hello")
}

func TestNonPersistentTopic(t *testing.T) {
topicName := "non-persistent://public/default/testNonPersistentTopic"
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
defer client.Close()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})

assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
defer consumer.Close()
}

0 comments on commit de147fa

Please sign in to comment.