diff --git a/mocks/async_producer.go b/mocks/async_producer.go index d7a4860e4..418f077e6 100644 --- a/mocks/async_producer.go +++ b/mocks/async_producer.go @@ -152,10 +152,12 @@ func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError { // check the message. If an error is returned it will be made available on the Errors channel // otherwise the mock will handle the message as if it produced successfully, i.e. it will make it // available on the Successes channel if the Producer.Return.Successes setting is set to true. -func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed(cf MessageChecker) { +func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *AsyncProducer { mp.l.Lock() defer mp.l.Unlock() mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) + + return mp } // ExpectInputWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that a @@ -163,10 +165,12 @@ func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed(cf Mess // function to check the message. If an error is returned it will be made available on the Errors // channel otherwise the mock will handle the message as if it failed to produce successfully. This // means it will make a ProducerError available on the Errors channel. -func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) { +func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *AsyncProducer { mp.l.Lock() defer mp.l.Unlock() mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) + + return mp } // ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message @@ -174,8 +178,10 @@ func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail(cf Message // the message value. If an error is returned it will be made available on the Errors channel // otherwise the mock will handle the message as if it produced successfully, i.e. it will make // it available on the Successes channel if the Producer.Return.Successes setting is set to true. -func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) { +func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) *AsyncProducer { mp.ExpectInputWithMessageCheckerFunctionAndSucceed(messageValueChecker(cf)) + + return mp } // ExpectInputWithCheckerFunctionAndFail sets an expectation on the mock producer that a message @@ -183,21 +189,27 @@ func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecke // check the message value. If an error is returned it will be made available on the Errors channel // otherwise the mock will handle the message as if it failed to produce successfully. This means // it will make a ProducerError available on the Errors channel. -func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) { +func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) *AsyncProducer { mp.ExpectInputWithMessageCheckerFunctionAndFail(messageValueChecker(cf), err) + + return mp } // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided // on the input channel. The mock producer will handle the message as if it is produced successfully, // i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting // is set to true. -func (mp *AsyncProducer) ExpectInputAndSucceed() { +func (mp *AsyncProducer) ExpectInputAndSucceed() *AsyncProducer { mp.ExpectInputWithMessageCheckerFunctionAndSucceed(nil) + + return mp } // ExpectInputAndFail sets an expectation on the mock producer that a message will be provided // on the input channel. The mock producer will handle the message as if it failed to produce // successfully. This means it will make a ProducerError available on the Errors channel. -func (mp *AsyncProducer) ExpectInputAndFail(err error) { +func (mp *AsyncProducer) ExpectInputAndFail(err error) *AsyncProducer { mp.ExpectInputWithMessageCheckerFunctionAndFail(nil, err) + + return mp } diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index 280823a2b..29ed9238f 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -45,11 +45,10 @@ func TestMockAsyncProducerImplementsAsyncProducerInterface(t *testing.T) { func TestProducerReturnsExpectationsToChannels(t *testing.T) { config := NewTestConfig() config.Producer.Return.Successes = true - mp := NewAsyncProducer(t, config) - - mp.ExpectInputAndSucceed() - mp.ExpectInputAndSucceed() - mp.ExpectInputAndFail(sarama.ErrOutOfBrokers) + mp := NewAsyncProducer(t, config). + ExpectInputAndSucceed(). + ExpectInputAndSucceed(). + ExpectInputAndFail(sarama.ErrOutOfBrokers) mp.Input() <- &sarama.ProducerMessage{Topic: "test 1"} mp.Input() <- &sarama.ProducerMessage{Topic: "test 2"} @@ -95,9 +94,9 @@ func TestProducerWithTooFewExpectations(t *testing.T) { func TestProducerWithTooManyExpectations(t *testing.T) { trm := newTestReporterMock() - mp := NewAsyncProducer(trm, nil) - mp.ExpectInputAndSucceed() - mp.ExpectInputAndFail(sarama.ErrOutOfBrokers) + mp := NewAsyncProducer(trm, nil). + ExpectInputAndSucceed(). + ExpectInputAndFail(sarama.ErrOutOfBrokers) mp.Input() <- &sarama.ProducerMessage{Topic: "test"} if err := mp.Close(); err != nil { @@ -111,9 +110,9 @@ func TestProducerWithTooManyExpectations(t *testing.T) { func TestProducerWithCheckerFunction(t *testing.T) { trm := newTestReporterMock() - mp := NewAsyncProducer(trm, nil) - mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) - mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$")) + mp := NewAsyncProducer(trm, nil). + ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")). + ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$")) mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} diff --git a/mocks/consumer.go b/mocks/consumer.go index baec88b51..c10ae4765 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -276,7 +276,7 @@ func (pc *PartitionConsumer) HighWaterMarkOffset() int64 { // message was consumed from the Messages channel, because there are legitimate // reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will // verify that the channel is empty on close. -func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) { +func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *PartitionConsumer { pc.l.Lock() defer pc.l.Unlock() @@ -285,6 +285,8 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) { msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) pc.messages <- msg + + return pc } // YieldError will yield an error on the Errors channel of this partition consumer @@ -292,24 +294,30 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) { // consumed from the Errors channel, because there are legitimate reasons for this // not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that // the channel is empty on close. -func (pc *PartitionConsumer) YieldError(err error) { +func (pc *PartitionConsumer) YieldError(err error) *PartitionConsumer { pc.errors <- &sarama.ConsumerError{ Topic: pc.topic, Partition: pc.partition, Err: err, } + + return pc } // ExpectMessagesDrainedOnClose sets an expectation on the partition consumer // that the messages channel will be fully drained when Close is called. If this // expectation is not met, an error is reported to the error reporter. -func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() { +func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() *PartitionConsumer { pc.messagesShouldBeDrained = true + + return pc } // ExpectErrorsDrainedOnClose sets an expectation on the partition consumer // that the errors channel will be fully drained when Close is called. If this // expectation is not met, an error is reported to the error reporter. -func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() { +func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() *PartitionConsumer { pc.errorsShouldBeDrained = true + + return pc } diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index e1202cc7a..9367d6aa1 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -143,10 +143,10 @@ func TestConsumerWithWrongOffsetExpectation(t *testing.T) { func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) { trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) - pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) - pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) - pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) - pcmock.ExpectMessagesDrainedOnClose() + consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest). + YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}). + YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}). + ExpectMessagesDrainedOnClose() pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) if err != nil { @@ -169,10 +169,10 @@ func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) { trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) - pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) - pcmock.YieldError(sarama.ErrInvalidMessage) - pcmock.YieldError(sarama.ErrInvalidMessage) - pcmock.ExpectErrorsDrainedOnClose() + consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest). + YieldError(sarama.ErrInvalidMessage). + YieldError(sarama.ErrInvalidMessage). + ExpectErrorsDrainedOnClose() pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) if err != nil { diff --git a/mocks/sync_producer.go b/mocks/sync_producer.go index 1220941c6..67e5352f5 100644 --- a/mocks/sync_producer.go +++ b/mocks/sync_producer.go @@ -149,48 +149,60 @@ func (sp *SyncProducer) Close() error { // that SendMessage will be called. The mock producer will first call the given function to check // the message. It will cascade the error of the function, if any, or handle the message as if it // produced successfully, i.e. by returning a valid partition, and offset, and a nil error. -func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed(cf MessageChecker) { +func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *SyncProducer { sp.l.Lock() defer sp.l.Unlock() sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) + + return sp } // ExpectSendMessageWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that // SendMessage will be called. The mock producer will first call the given function to check the // message. It will cascade the error of the function, if any, or handle the message as if it // failed to produce successfully, i.e. by returning the provided error. -func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) { +func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *SyncProducer { sp.l.Lock() defer sp.l.Unlock() sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) + + return sp } // ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage // will be called. The mock producer will first call the given function to check the message value. // It will cascade the error of the function, if any, or handle the message as if it produced // successfully, i.e. by returning a valid partition, and offset, and a nil error. -func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) { +func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) *SyncProducer { sp.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(messageValueChecker(cf)) + + return sp } // ExpectSendMessageWithCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be // called. The mock producer will first call the given function to check the message value. // It will cascade the error of the function, if any, or handle the message as if it failed // to produce successfully, i.e. by returning the provided error. -func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) { +func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) *SyncProducer { sp.ExpectSendMessageWithMessageCheckerFunctionAndFail(messageValueChecker(cf), err) + + return sp } // ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it produced successfully, i.e. by // returning a valid partition, and offset, and a nil error. -func (sp *SyncProducer) ExpectSendMessageAndSucceed() { +func (sp *SyncProducer) ExpectSendMessageAndSucceed() *SyncProducer { sp.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(nil) + + return sp } // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it failed to produce // successfully, i.e. by returning the provided error. -func (sp *SyncProducer) ExpectSendMessageAndFail(err error) { +func (sp *SyncProducer) ExpectSendMessageAndFail(err error) *SyncProducer { sp.ExpectSendMessageWithMessageCheckerFunctionAndFail(nil, err) + + return sp } diff --git a/mocks/sync_producer_test.go b/mocks/sync_producer_test.go index 8c4582ba9..ca1f65f5d 100644 --- a/mocks/sync_producer_test.go +++ b/mocks/sync_producer_test.go @@ -58,9 +58,9 @@ func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) { func TestSyncProducerWithTooManyExpectations(t *testing.T) { trm := newTestReporterMock() - sp := NewSyncProducer(trm, nil) - sp.ExpectSendMessageAndSucceed() - sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers) + sp := NewSyncProducer(trm, nil). + ExpectSendMessageAndSucceed(). + ExpectSendMessageAndFail(sarama.ErrOutOfBrokers) msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} if _, _, err := sp.SendMessage(msg); err != nil { @@ -79,8 +79,7 @@ func TestSyncProducerWithTooManyExpectations(t *testing.T) { func TestSyncProducerWithTooFewExpectations(t *testing.T) { trm := newTestReporterMock() - sp := NewSyncProducer(trm, nil) - sp.ExpectSendMessageAndSucceed() + sp := NewSyncProducer(trm, nil).ExpectSendMessageAndSucceed() msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} if _, _, err := sp.SendMessage(msg); err != nil { @@ -102,9 +101,9 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) { func TestSyncProducerWithCheckerFunction(t *testing.T) { trm := newTestReporterMock() - sp := NewSyncProducer(trm, nil) - sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) - sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$")) + sp := NewSyncProducer(trm, nil). + ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")). + ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$")) msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} if _, _, err := sp.SendMessage(msg); err != nil { @@ -127,9 +126,9 @@ func TestSyncProducerWithCheckerFunction(t *testing.T) { func TestSyncProducerWithCheckerFunctionForSendMessagesWithError(t *testing.T) { trm := newTestReporterMock() - sp := NewSyncProducer(trm, nil) - sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) - sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$")) + sp := NewSyncProducer(trm, nil). + ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")). + ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$")) msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} msg2 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} @@ -151,8 +150,8 @@ func TestSyncProducerWithCheckerFunctionForSendMessagesWithError(t *testing.T) { func TestSyncProducerWithCheckerFunctionForSendMessagesWithoutError(t *testing.T) { trm := newTestReporterMock() - sp := NewSyncProducer(trm, nil) - sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + sp := NewSyncProducer(trm, nil). + ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} msgs := []*sarama.ProducerMessage{msg1} @@ -180,8 +179,8 @@ func TestSyncProducerWithCheckerFunctionForSendMessagesWithoutError(t *testing.T func TestSyncProducerSendMessagesExpectationsMismatchTooFew(t *testing.T) { trm := newTestReporterMock() - sp := NewSyncProducer(trm, nil) - sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + sp := NewSyncProducer(trm, nil). + ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} msg2 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} @@ -204,9 +203,9 @@ func TestSyncProducerSendMessagesExpectationsMismatchTooFew(t *testing.T) { func TestSyncProducerSendMessagesExpectationsMismatchTooMany(t *testing.T) { trm := newTestReporterMock() - sp := NewSyncProducer(trm, nil) - sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) - sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + sp := NewSyncProducer(trm, nil). + ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")). + ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} msgs := []*sarama.ProducerMessage{msg1} @@ -227,8 +226,8 @@ func TestSyncProducerSendMessagesExpectationsMismatchTooMany(t *testing.T) { func TestSyncProducerSendMessagesFaultyEncoder(t *testing.T) { trm := newTestReporterMock() - sp := NewSyncProducer(trm, nil) - sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + sp := NewSyncProducer(trm, nil). + ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) msg1 := &sarama.ProducerMessage{Topic: "test", Value: faultyEncoder("123")} msgs := []*sarama.ProducerMessage{msg1}