From 2a4b3822004e795815a75df9f75296d7adbf7680 Mon Sep 17 00:00:00 2001 From: Artyom Blagov Date: Thu, 26 Jul 2018 16:33:23 +0300 Subject: [PATCH] add long-running test to rabbitmq to check reconnections --- queues/rabbitmq/rabbitmq_test.go | 51 ++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/queues/rabbitmq/rabbitmq_test.go b/queues/rabbitmq/rabbitmq_test.go index ba39966..072935a 100644 --- a/queues/rabbitmq/rabbitmq_test.go +++ b/queues/rabbitmq/rabbitmq_test.go @@ -2,6 +2,7 @@ package rabbitmq import ( "fmt" + "strconv" "sync" "testing" "time" @@ -146,3 +147,53 @@ func TestPublisher(t *testing.T) { transport.Stop() <-doneChan } + +// 1. Start RabbitMQ +// 2. Start this test +// 3. After receiving some messages, shutdown RabbitMQ (or somehow drop connection) +// 4. Restore connection by starting RabbitMQ +// 5. Messages should continue to receive +func TestLong(t *testing.T) { + is := is.New(t) + + transport := New() + var wg sync.WaitGroup + doneChan := make(chan struct{}) + + waitChan := make(chan struct{}) + var once sync.Once + + go func() { + receiveChan := transport.Receive("test_send") + defer close(doneChan) + defer fmt.Println("receiving done") + for { + select { + case <-transport.Done(): + return + case err := <-transport.ErrChan(): + is.NoErr(err) + case msg := <-receiveChan: + fmt.Println("receive", string(msg)) + default: + once.Do(func() { + close(waitChan) + }) + } + } + }() + + <-waitChan + + wg.Add(1) + sendChan := transport.Send("test_send") + for i := 0; i < 1000; i++ { + fmt.Println("send", i, "message") + sendChan <- []byte(strconv.Itoa(i)) + time.Sleep(time.Second) + } + + wg.Wait() + transport.Stop() + <-doneChan +}