Skip to content

Commit

Permalink
Fix messenger locking issue
Browse files Browse the repository at this point in the history
  • Loading branch information
CoderCookE committed Nov 7, 2019
1 parent f97f396 commit b742b28
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 70 deletions.
14 changes: 12 additions & 2 deletions internal/connectionpool/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
type message struct {
health bool
backend string
proxy *httputil.ReverseProxy
}

type connection struct {
Expand Down Expand Up @@ -38,6 +39,7 @@ func newConnection(proxy *httputil.ReverseProxy, backend string, cache *ristrett
func (c *connection) get(w http.ResponseWriter, r *http.Request) error {
c.RLock()
defer c.RUnlock()

health := c.healthy
err := errors.New("Unhealthy Node")
if c.cache != nil {
Expand All @@ -61,11 +63,19 @@ func (c *connection) get(w http.ResponseWriter, r *http.Request) error {
func (c *connection) healthCheck() {
for {
healthy := <-c.messages

c.Lock()

health := healthy.health
backend := healthy.backend
c.Lock()
c.healthy = health
c.backend = backend
proxy := healthy.proxy

if proxy != nil && c.backend != backend {
c.backend = backend
c.proxy = proxy
}

c.Unlock()
}
}
16 changes: 7 additions & 9 deletions internal/connectionpool/health-checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"log"
"net/http"
"net/http/httputil"
"sync"
"time"
)
Expand Down Expand Up @@ -49,15 +50,13 @@ func (hc *healthChecker) Start() {
}
}

func (hc *healthChecker) Reuse(newBackend string) {
func (hc *healthChecker) Reuse(newBackend string, proxy *httputil.ReverseProxy) *healthChecker {
hc.Lock()
hc.backend = newBackend
hc.notifySubscribers(true, hc.backend)
hc.notifySubscribers(false, hc.backend, proxy)
hc.Unlock()
}

func (hc *healthChecker) Remove() {

return hc
}

func (hc *healthChecker) check(ctx context.Context) {
Expand Down Expand Up @@ -88,13 +87,12 @@ func (hc *healthChecker) check(ctx context.Context) {

if healthy != hc.currentHealth {
hc.currentHealth = healthy
hc.notifySubscribers(healthy, hc.backend)
hc.notifySubscribers(healthy, hc.backend, nil)
}
}

func (hc *healthChecker) notifySubscribers(healthy bool, backend string) {
message := message{health: healthy, backend: backend}

func (hc *healthChecker) notifySubscribers(healthy bool, backend string, proxy *httputil.ReverseProxy) {
message := message{health: healthy, backend: backend, proxy: proxy}
for _, c := range hc.subscribers {
c <- message
}
Expand Down
32 changes: 28 additions & 4 deletions internal/connectionpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,38 @@ func (p *pool) ListenForBackendChanges() {
}

added, removed := difference(currentBackends, updated)

for _, removedBackend := range removed {
if len(added) > 0 {
var new string
new, added = added[0], added[1:]
p.healthChecks[removedBackend].Reuse(new)

url, err := url.ParseRequestURI(new)
if err != nil {
log.Printf("Error adding backend, %s", new)
} else {
proxy := httputil.NewSingleHostReverseProxy(url)
proxy.Transport = p.client.Transport

if p.cacheEnabled {
cacheResponse := func(r *http.Response) error {
body, err := ioutil.ReadAll(r.Body)
cacheable := string(body)
r.Body = ioutil.NopCloser(bytes.NewBuffer(body))

path := r.Request.URL.Path
if err == nil {
p.cache.Set(path, cacheable, 1)
}

return nil
}
proxy.ModifyResponse = cacheResponse
}

newHC := p.healthChecks[removedBackend].Reuse(new, proxy)
p.healthChecks[new] = newHC
}
} else {
p.healthChecks[removedBackend].Remove()
p.healthChecks[removedBackend].Shutdown()
}

delete(p.healthChecks, removedBackend)
Expand All @@ -177,6 +200,7 @@ func (p *pool) ListenForBackendChanges() {

shuffle(poolConnections, p.connections)
}

}

func difference(original []string, updated []string) (added []string, removed []string) {
Expand Down
115 changes: 60 additions & 55 deletions internal/connectionpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/CoderCookE/goaround/internal/assert"
)
Expand Down Expand Up @@ -50,7 +51,9 @@ func TestFetch(t *testing.T) {

connectionPool := New(config)
defer connectionPool.Shutdown()

<-availableResChan
time.Sleep(200)
wg.Add(1)

for i := 0; i < 5; i++ {
Expand All @@ -71,61 +74,61 @@ func TestFetch(t *testing.T) {
})

t.Run("No cache", func(t *testing.T) {
// t.Run("fetches each request from server", func(t *testing.T) {
// callCount := 0
// availableResChan := make(chan bool, 1)
// wg := &sync.WaitGroup{}

// availableHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// var message []byte

// if r.URL.Path == "/health" {
// healthReponse := &healthCheckReponse{State: "healthy", Message: ""}
// message, _ = json.Marshal(healthReponse)

// availableResChan <- true
// }

// println(r.URL.Path)
// if r.URL.Path == "/foo" {
// wg.Done()
// callCount += 1
// message = []byte("hello")
// }

// w.Write(message)
// })

// availableServer := httptest.NewServer(availableHandler)
// defer availableServer.Close()

// backends := []string{availableServer.URL}
// config := &Config{
// Backends: backends,
// NumConns: 10,
// }

// connectionPool := New(config)
// defer connectionPool.Shutdown()
// <-availableResChan

// for i := 0; i < 5; i++ {
// wg.Add(1)
// reader := strings.NewReader("This is a test")
// request := httptest.NewRequest("GET", "http://www.test.com/foo", reader)
// recorder := httptest.NewRecorder()
// connectionPool.Fetch(recorder, request)

// wg.Wait()

// result, err := ioutil.ReadAll(recorder.Result().Body)
// assertion.Equal(err, nil)
// assertion.Equal(recorder.Code, http.StatusOK)
// assertion.Equal(string(result), "hello")
// }

// assertion.Equal(callCount, 5)
// })
t.Run("fetches each request from server", func(t *testing.T) {
callCount := 0
availableResChan := make(chan bool, 1)
wg := &sync.WaitGroup{}

availableHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var message []byte

if r.URL.Path == "/health" {
healthReponse := &healthCheckReponse{State: "healthy", Message: ""}
message, _ = json.Marshal(healthReponse)

availableResChan <- true
}

if r.URL.Path == "/foo" {
wg.Done()
callCount += 1
message = []byte("hello")
}

w.Write(message)
})

availableServer := httptest.NewServer(availableHandler)
defer availableServer.Close()

backends := []string{availableServer.URL}
config := &Config{
Backends: backends,
NumConns: 10,
}

connectionPool := New(config)
defer connectionPool.Shutdown()
<-availableResChan
time.Sleep(200)

for i := 0; i < 5; i++ {
wg.Add(1)
reader := strings.NewReader("This is a test")
request := httptest.NewRequest("GET", "http://www.test.com/foo", reader)
recorder := httptest.NewRecorder()
connectionPool.Fetch(recorder, request)

wg.Wait()

result, err := ioutil.ReadAll(recorder.Result().Body)
assertion.Equal(err, nil)
assertion.Equal(recorder.Code, http.StatusOK)
assertion.Equal(string(result), "hello")
}

assertion.Equal(callCount, 5)
})

t.Run("First connection tried is degraded, Uses next connections", func(t *testing.T) {
callCount := 0
Expand Down Expand Up @@ -170,6 +173,8 @@ func TestFetch(t *testing.T) {
defer connectionPool.Shutdown()

<-availableResChan
time.Sleep(200)

reader := strings.NewReader("This is a test")
request := httptest.NewRequest("GET", "http://www.test.com/foo", reader)
recorder := httptest.NewRecorder()
Expand Down

0 comments on commit b742b28

Please sign in to comment.