Skip to content

Commit

Permalink
QoL / Perf improvements and more tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tristan Hyams committed Sep 15, 2019
1 parent 3b9ef70 commit 5fbb4c0
Show file tree
Hide file tree
Showing 24 changed files with 423 additions and 110 deletions.
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,59 @@ Just remember Channels get closed or get killed all the time, you don't want thi

---

<details><summary>Click here for some Channel Pool benchmarks!</summary>
<p>

This is a raw AMQP publish test. We create an AMQP connection, create an AMQP channel, and execute an AMQP publish looped.
MessageCount: 100,000
MessageSize: 2500 (2.5KB)

PS C:\GitHub\personal\turbocookedrabbit> go.exe test -run "^(TestCreateSingleChannelAndPublish)$" -v
=== RUN TestCreateSingleChannelAndPublish
--- PASS: TestCreateSingleChannelAndPublish (4.78s)
main_benchpool_test.go:17: 2019-09-15 07:39:48.3578722 -0400 EDT m=+0.091806701: Benchmark Starts
main_benchpool_test.go:49: 2019-09-15 07:39:53.1357496 -0400 EDT m=+4.869684101: Benchmark End
main_benchpool_test.go:50: 2019-09-15 07:39:53.1357496 -0400 EDT m=+4.869684101: Time Elapsed 4.7778774s
main_benchpool_test.go:51: 2019-09-15 07:39:53.1357496 -0400 EDT m=+4.869684101: Publish Errors 0
main_benchpool_test.go:52: 2019-09-15 07:39:53.1357496 -0400 EDT m=+4.869684101: Msgs/s 20929.796148
main_benchpool_test.go:53: 2019-09-15 07:39:53.1357496 -0400 EDT m=+4.869684101: KB/s 0.523245
PASS
ok github.com/houseofcat/turbocookedrabbit 6.512s

Apples to Apples comparison using a ChannelPool. As you can see - the numbers went up - but should have been relatively the same. Shere is some variability with these tests. The important thing to note is that there isn't a significant reduction in performance.

PS C:\GitHub\personal\turbocookedrabbit> go.exe test -run "^(TestGetSingleChannelFromPoolAndPublish)$" -v
=== RUN TestGetSingleChannelFromPoolAndPublish
--- PASS: TestGetSingleChannelFromPoolAndPublish (4.32s)
main_benchpool_test.go:59: 2019-09-15 07:47:19.0276488 -0400 EDT m=+0.090818001: Benchmark Starts
main_benchpool_test.go:89: 2019-09-15 07:47:23.3516406 -0400 EDT m=+4.414809801: Benchmark End
main_benchpool_test.go:90: 2019-09-15 07:47:23.3516406 -0400 EDT m=+4.414809801: Time Elapsed 4.3239918s
main_benchpool_test.go:91: 2019-09-15 07:47:23.3516406 -0400 EDT m=+4.414809801: Publish Errors 0
main_benchpool_test.go:92: 2019-09-15 07:47:23.3516406 -0400 EDT m=+4.414809801: Msgs/s 23126.778363
main_benchpool_test.go:93: 2019-09-15 07:47:23.3516406 -0400 EDT m=+4.414809801: KB/s 0.578169
PASS
ok github.com/houseofcat/turbocookedrabbit 4.506s

Apples to Apple Orange comparison same premise, but different ChannelHost per Publish.

PS C:\GitHub\personal\turbocookedrabbit> go.exe test -run "^(TestGetMultiChannelFromPoolAndPublish)$" -v
=== RUN TestGetMultiChannelFromPoolAndPublish
--- PASS: TestGetMultiChannelFromPoolAndPublish (3.72s)
main_benchpool_test.go:99: 2019-09-15 07:40:44.1532264 -0400 EDT m=+0.086463601: Benchmark Starts
main_benchpool_test.go:135: 2019-09-15 07:40:47.8682928 -0400 EDT m=+3.801530001: Benchmark End
main_benchpool_test.go:136: 2019-09-15 07:40:47.8682928 -0400 EDT m=+3.801530001: Time Elapsed 3.7150664s
main_benchpool_test.go:137: 2019-09-15 07:40:47.8682928 -0400 EDT m=+3.801530001: ChannelPool Errors 0
main_benchpool_test.go:138: 2019-09-15 07:40:47.8682928 -0400 EDT m=+3.801530001: Publish Errors 0
main_benchpool_test.go:139: 2019-09-15 07:40:47.8682928 -0400 EDT m=+3.801530001: Msgs/s 26917.419296
main_benchpool_test.go:140: 2019-09-15 07:40:47.8682928 -0400 EDT m=+3.801530001: KB/s 0.672935
PASS
ok github.com/houseofcat/turbocookedrabbit 3.893s

</p>
</details>

---

## The Topologer

<details><summary>How do I create/delete/bind queues and exchanges?</summary>
Expand Down
21 changes: 8 additions & 13 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,10 @@ func (con *Consumer) Get(queueName string, autoAck bool) (*models.Message, error
return nil, err
}

defer con.channelPool.ReturnChannel(chanHost)

// Get Single Message
amqpDelivery, ok, getErr := chanHost.Channel.Get(queueName, autoAck)
if getErr != nil {
con.channelPool.FlagChannel(chanHost.ChannelID)
con.channelPool.ReturnChannel(chanHost)
con.channelPool.ReturnChannel(chanHost, true)
return nil, getErr
}

Expand All @@ -153,7 +150,7 @@ func (con *Consumer) Get(queueName string, autoAck bool) (*models.Message, error
amqpDelivery.DeliveryTag,
chanHost.Channel), nil
}

con.channelPool.ReturnChannel(chanHost, false)
return nil, nil
}

Expand Down Expand Up @@ -189,8 +186,7 @@ GetBatchLoop:

amqpDelivery, ok, getErr := chanHost.Channel.Get(queueName, autoAck)
if getErr != nil {
con.channelPool.FlagChannel(chanHost.ChannelID)
con.channelPool.ReturnChannel(chanHost)
con.channelPool.ReturnChannel(chanHost, true)
return nil, getErr
}

Expand Down Expand Up @@ -290,7 +286,7 @@ func (con *Consumer) getDeliveryChannel() (<-chan amqp.Delivery, *models.Channel
// Start Consuming
deliveryChan, err := chanHost.Channel.Consume(con.QueueName, con.ConsumerName, con.autoAck, con.exclusive, false, con.noWait, nil)
if err != nil {
con.handleErrorAndFlagChannel(err, chanHost.ChannelID)
con.handleErrorAndChannel(err, chanHost)
return nil, nil, err // Retry
}

Expand All @@ -307,8 +303,7 @@ ProcessDeliveriesInnerLoop:
select {
case errorMessage := <-chanHost.CloseErrors():
if errorMessage != nil {
con.handleErrorAndFlagChannel(fmt.Errorf("consumer's current channel closed\r\n[reason: %s]\r\n[code: %d]", errorMessage.Reason, errorMessage.Code), chanHost.ChannelID)
con.channelPool.ReturnChannel(chanHost)
con.handleErrorAndChannel(fmt.Errorf("consumer's current channel closed\r\n[reason: %s]\r\n[code: %d]", errorMessage.Reason, errorMessage.Code), chanHost)
break ProcessDeliveriesInnerLoop
}
default:
Expand All @@ -329,7 +324,7 @@ ProcessDeliveriesInnerLoop:
select {
case stop := <-con.consumeStop:
if stop {
con.channelPool.ReturnChannel(chanHost)
con.channelPool.ReturnChannel(chanHost, false)
return true
}
default:
Expand Down Expand Up @@ -368,8 +363,8 @@ func (con *Consumer) Messages() <-chan *models.Message {
return con.messages
}

func (con *Consumer) handleErrorAndFlagChannel(err error, channelID uint64) {
con.channelPool.FlagChannel(channelID)
func (con *Consumer) handleErrorAndChannel(err error, chanHost *models.ChannelHost) {
con.channelPool.ReturnChannel(chanHost, true)
con.handleError(err)
}

Expand Down
3 changes: 3 additions & 0 deletions consumer/testconsumerseasoning.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"ErrorBuffer": 10,
"SleepOnErrorInterval": 5000,
"MaxConnectionCount": 10,
"HeartbeatInterval": 5,
"ConnectionTimeout": 10,
"TLSConfig": {
"EnableTLS": false,
"PEMCertLocation": "test/catest.pem",
Expand Down Expand Up @@ -49,6 +51,7 @@
},
"PublisherConfig":{
"SleepOnIdleInterval": 0,
"SleepOnQueueFullInterval": 100,
"SleepOnErrorInterval": 1000,
"LetterBuffer": 1000,
"MaxOverBuffer": 1000,
Expand Down
Binary file removed cpuDuration.prof
Binary file not shown.
185 changes: 185 additions & 0 deletions main_benchpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package main_test

import (
"context"
"runtime/trace"
"testing"
"time"

"github.com/houseofcat/turbocookedrabbit/pools"
"github.com/houseofcat/turbocookedrabbit/utils"
"github.com/streadway/amqp"
)

func TestCreateSingleChannelAndPublish(t *testing.T) {

startTime := time.Now()
t.Logf("%s: Benchmark Starts\r\n", startTime)

messageCount := 100000
messageSize := 2500
publishErrors := 0

letter := utils.CreateLetter("", "ConsumerTestQueue", utils.RandomBytes(messageSize))

amqpConn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
t.Error(err)
return
}
amqpChan, err := amqpConn.Channel()
if err != nil {
t.Error(err)
return
}

for i := 0; i < messageCount; i++ {

err := amqpChan.Publish(
letter.Envelope.Exchange,
letter.Envelope.RoutingKey,
letter.Envelope.Mandatory,
true,
amqp.Publishing{
ContentType: letter.Envelope.ContentType,
Body: letter.Body,
})
if err != nil {
publishErrors++
}
}

amqpChan.Close()
amqpConn.Close()

duration := time.Since(startTime)

t.Logf("%s: Benchmark End\r\n", time.Now())
t.Logf("%s: Time Elapsed %s\r\n", time.Now(), duration)
t.Logf("%s: Publish Errors %d\r\n", time.Now(), publishErrors)
t.Logf("%s: Msgs/s %f\r\n", time.Now(), float64(messageCount)/duration.Seconds())
t.Logf("%s: KB/s %f\r\n", time.Now(), (float64(messageSize)/duration.Seconds())/1000)
}

func TestGetSingleChannelFromPoolAndPublish(t *testing.T) {

startTime := time.Now()
t.Logf("%s: Benchmark Starts\r\n", startTime)

messageCount := 100000
messageSize := 2500

letter := utils.CreateLetter("", "ConsumerTestQueue", utils.RandomBytes(messageSize))

channelHost, err := ChannelPool.GetChannel()
if err != nil {
t.Error(err)
return
}
publishErrors := 0

for i := 0; i < messageCount; i++ {

err := channelHost.Channel.Publish(
letter.Envelope.Exchange,
letter.Envelope.RoutingKey,
letter.Envelope.Mandatory,
letter.Envelope.Immediate,
amqp.Publishing{
ContentType: letter.Envelope.ContentType,
Body: letter.Body,
})
if err != nil {
if publishErrors == 0 {
t.Error(err)
}
publishErrors++
}
}

ChannelPool.ReturnChannel(channelHost, false)

duration := time.Since(startTime)

t.Logf("%s: Benchmark End\r\n", time.Now())
t.Logf("%s: Time Elapsed %s\r\n", time.Now(), duration)
t.Logf("%s: Publish Errors %d\r\n", time.Now(), publishErrors)
t.Logf("%s: Msgs/s %f\r\n", time.Now(), float64(messageCount)/duration.Seconds())
t.Logf("%s: KB/s %f\r\n", time.Now(), (float64(messageSize)/duration.Seconds())/1000)
}

func TestGetMultiChannelFromPoolAndPublish(t *testing.T) {

startTime := time.Now()
t.Logf("%s: Benchmark Starts\r\n", startTime)

poolErrors := 0
publishErrors := 0
messageCount := 100000
messageSize := 2500

letter := utils.CreateLetter("", "ConsumerTestQueue", utils.RandomBytes(messageSize))

for i := 0; i < messageCount; i++ {
channelHost, err := ChannelPool.GetChannel()
if err != nil {
poolErrors++
continue
}

err = channelHost.Channel.Publish(
letter.Envelope.Exchange,
letter.Envelope.RoutingKey,
letter.Envelope.Mandatory,
letter.Envelope.Immediate,
amqp.Publishing{
ContentType: letter.Envelope.ContentType,
Body: letter.Body,
})
if err != nil {
publishErrors++
ChannelPool.ReturnChannel(channelHost, true)
continue
}

ChannelPool.ReturnChannel(channelHost, false)
}

duration := time.Since(startTime)

t.Logf("%s: Benchmark End\r\n", time.Now())
t.Logf("%s: Time Elapsed %s\r\n", time.Now(), duration)
t.Logf("%s: ChannelPool Errors %d\r\n", time.Now(), poolErrors)
t.Logf("%s: Publish Errors %d\r\n", time.Now(), publishErrors)
t.Logf("%s: Msgs/s %f\r\n", time.Now(), float64(messageCount)/duration.Seconds())
t.Logf("%s: KB/s %f\r\n", time.Now(), (float64(messageSize)/duration.Seconds())/1000)
}

func BenchmarkGetChannelAndPublish(b *testing.B) {
b.ReportAllocs()
_, task := trace.NewTask(context.Background(), "BenchmarkGetChannel")
defer task.End()

channelPool, _ := pools.NewChannelPool(Seasoning.PoolConfig, nil, true)
letter := utils.CreateLetter("", "ConsumerTestQueue", utils.RandomBytes(1000))
poolErrors := 0

for i := 0; i < 1000; i++ {
channelHost, err := channelPool.GetChannel()
if err != nil {
poolErrors++
continue
}
channelHost.Channel.Publish(
letter.Envelope.Exchange,
letter.Envelope.RoutingKey,
letter.Envelope.Mandatory,
letter.Envelope.Immediate,
amqp.Publishing{
ContentType: letter.Envelope.ContentType,
Body: letter.Body,
})

channelPool.ReturnChannel(channelHost, false)
}
}
Binary file removed memDuration.prof
Binary file not shown.
13 changes: 8 additions & 5 deletions models/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type ChannelPoolConfig struct {
// ConnectionPoolConfig represents settings for creating connection pools.
type ConnectionPoolConfig struct {
URI string `json:"URI"`
Heartbeat uint32 `json:"Heartbeat"`
ConnectionTimeout uint32 `json:"ConnectionTimeout"`
ErrorBuffer uint16 `json:"ErrorBuffer"`
SleepOnErrorInterval uint32 `json:"SleepOnErrorInterval"` // sleep length on errors
EnableTLS bool `json:"EnableTLS"` // Use TLSConfig to create connections with AMQPS uri.
Expand Down Expand Up @@ -57,11 +59,12 @@ type ConsumerConfig struct {

// PublisherConfig represents settings for configuring global settings for all Publishers with ease.
type PublisherConfig struct {
SleepOnIdleInterval uint32 `json:"SleepOnIdleInterval"`
SleepOnErrorInterval uint32 `json:"SleepOnErrorInterval"`
LetterBuffer uint64 `json:"LetterBuffer"`
MaxOverBuffer uint64 `json:"MaxOverBuffer"`
NotificationBuffer uint32 `json:"NotificationBuffer"`
SleepOnIdleInterval uint32 `json:"SleepOnIdleInterval"`
SleepOnQueueFullInterval uint32 `json:"SleepOnQueueFullInterval"`
SleepOnErrorInterval uint32 `json:"SleepOnErrorInterval"`
LetterBuffer uint64 `json:"LetterBuffer"`
MaxOverBuffer uint64 `json:"MaxOverBuffer"`
NotificationBuffer uint32 `json:"NotificationBuffer"`
}

// TopologyConfig allows you to build simple toplogies from a JSON file.
Expand Down
Loading

0 comments on commit 5fbb4c0

Please sign in to comment.