diff --git a/producer.go b/producer.go old mode 100644 new mode 100755 index fa28d19..b729433 --- a/producer.go +++ b/producer.go @@ -1627,6 +1627,29 @@ func (self *TopicProducerMgr) MultiPublishAndTrace(topic string, traceIDList []u return id, offset, rawSize, err } +func (self *TopicProducerMgr) PublishWithTimeout(topic string, body []byte, time time.Duration) error { + _, err := self.doCommandWithTimeoutAndRetry(topic, nil, time, 2, func(pid int) (*Command, error) { + if pid < 0 || pid == OLD_VERSION_PID { + // pub to old nsqd that not support partition + return Publish(topic, body), nil + } + return PublishWithPart(topic, strconv.Itoa(pid), body), nil + }) + return err +} + +func (self *TopicProducerMgr) PublishWithTimeoutAndPartition(topic string, partition int, body []byte, time time.Duration) error { + _, err := self.doCommandWithTimeoutAndRetryAndPartition(topic, partition, time, 2, func(pid int) (*Command, error) { + if pid < 0 || pid == OLD_VERSION_PID { + // pub to old nsqd that not support partition + return Publish(topic, body), nil + } + return PublishWithPart(topic, strconv.Itoa(pid), body), nil + }) + return err +} + + func (self *TopicProducerMgr) doCommandWithTimeoutAndRetryAndPartition(topic string, partition int, timeout time.Duration, maxRetry uint32, commandFunc func(pid int) (*Command, error)) ([]byte, error) { return self.doCommandWithTimeoutAndRetryTemplate(topic, timeout, maxRetry, commandFunc, diff --git a/producer_test.go b/producer_test.go old mode 100644 new mode 100755 index 6ba8918..28b6dac --- a/producer_test.go +++ b/producer_test.go @@ -10,6 +10,7 @@ import ( "os" "runtime" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -559,6 +560,58 @@ func TestTopicProducerMgrPubBackground(t *testing.T) { time.Sleep(time.Second * 5) } +func TestProducerMgrPublishWithTimeout(t *testing.T) { + + topicName := "topic_producer_mgr_pub_timeout" + strconv.Itoa(int(time.Now().Unix())) + EnsureTopic(t, 4150, topicName, 0) + + testingTimeout = true + testingSendTimeout = true + defer func() { + testingSendTimeout = false + testingTimeout = false + }() + time.Sleep(time.Second) + + config := NewConfig() + config.PubTimeout = time.Second * 2 + config.PubMaxBackgroundRetry = 10 + config.PubStrategy = PubRR + w, err := NewTopicProducerMgr([]string{topicName}, config) + if err != nil { + t.Fatal(err) + } + w.SetLogger(newTestLogger(t), LogLevelDebug) + lookupList := make([]string, 0) + lookupList = append(lookupList, "127.0.0.1:4161") + w.AddLookupdNodes(lookupList) + defer w.Stop() + + moreBeginTime := time.Now().Unix() + err = w.PublishWithTimeout(topicName, []byte("publish_test_case"), time.Second * 3) + if time.Now().Unix() - moreBeginTime <= 3 { + t.Fatalf("func timeout param have not override config timeout") + } + checkDeadlineExceeded(err, t) + + + lessBeginTime := time.Now().Unix() + err = w.PublishWithTimeout(topicName, []byte("publish_test_case"), time.Second) + if time.Now().Unix() - lessBeginTime >= 2 { + t.Fatalf("func timeout param have not override config timeout") + } + checkDeadlineExceeded(err, t) +} + +func checkDeadlineExceeded(err error, t *testing.T) { + if err != context.DeadlineExceeded { + if err != nil && strings.Contains(err.Error(), "deadline exceeded") { + return + } + t.Fatalf("error %s", err) + } +} + func TestTopicProducerMgrPubOrdered(t *testing.T) { stopC := make(chan struct{}) var meta metaInfo