Skip to content

Commit

Permalink
new: Upstream healthchecks - closed #49
Browse files Browse the repository at this point in the history
  • Loading branch information
fabiocicerchia committed Sep 7, 2021
1 parent 8303658 commit ce027fb
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 68 deletions.
2 changes: 1 addition & 1 deletion cache/engine/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"

"github.com/fabiocicerchia/go-proxy-cache/cache/engine/client"
"github.com/fabiocicerchia/go-proxy-cache/config"
Expand Down
6 changes: 4 additions & 2 deletions server/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ import (
"github.com/fabiocicerchia/go-proxy-cache/server/balancer/roundrobin"
)

var lb map[string]*roundrobin.Balancer
type LoadBalancing map[string]*roundrobin.Balancer

var lb LoadBalancing

// InitRoundRobin - Initialise the LB algorithm for round robin.
func InitRoundRobin(name string, endpoints []string) {
if len(lb) == 0 {
lb = make(map[string]*roundrobin.Balancer)
}

lb[name] = roundrobin.New(endpoints)
lb[name] = roundrobin.New(name, endpoints, true)
}

// GetLBRoundRobin - Returns backend server using RoundRobin algorithm.
Expand Down
117 changes: 106 additions & 11 deletions server/balancer/roundrobin/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,134 @@ package roundrobin

import (
"errors"
"fmt"
"net/http"
"sync"
"time"

log "github.com/sirupsen/logrus"
)

// TODO: make it customizable
const HealthCheckInterval time.Duration = 30 * time.Second

// ErrNoAvailableItem no item is available.
var ErrNoAvailableItem = errors.New("no item is available")

type Item struct {
healthy bool
endpoint string
}

// Balancer roundrobin instance.
type Balancer struct {
m sync.Mutex

next int
items []string
id string
next int
items []Item
logger log.Logger
}

// New - Creates a new instance.
func New(items []string) *Balancer {
return &Balancer{
m: sync.Mutex{},
next: 0,
items: items,
func New(name string, items []string, enableHealthchecks bool) *Balancer {
newItems := []Item{}
for _, v := range items {
item := Item{true, v}
newItems = append(newItems, item)
}

b := &Balancer{
id: name,
m: sync.Mutex{},
next: 0,
items: newItems,
logger: *log.StandardLogger(),
}

if enableHealthchecks {
b.CheckHealth()
}

return b
}

// GetHealthyNodes - Retrieves healthy nodes.
func (b Balancer) GetHealthyNodes() []Item {
healthyNodes := []Item{}

for _, v := range b.items {
if v.healthy {
healthyNodes = append(healthyNodes, v)
}
}

return healthyNodes
}

// Pick - Chooses next available item.
func (b *Balancer) Pick() (string, error) {
if len(b.items) == 0 {
b.m.Lock()
healthyNodes := b.GetHealthyNodes()
b.m.Unlock()

if len(healthyNodes) == 0 {
return "", ErrNoAvailableItem
}

b.m.Lock()
r := b.items[b.next]
b.next = (b.next + 1) % len(b.items)
r := healthyNodes[b.next]
b.next = (b.next + 1) % len(healthyNodes)
b.m.Unlock()

return r, nil
return r.endpoint, nil
}

// CheckHealth() - Period check on nodes status.
func (b *Balancer) CheckHealth() {
period := HealthCheckInterval // todo customize

go func() {
t := time.NewTicker(period)

for {
<-t.C

client := http.Client{
// return the 301/302
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
Timeout: 5 * time.Second, // TODO: make it custom
}

for k, v := range b.items {
endpointURL := fmt.Sprintf("http://%s", v.endpoint) // todo fix

req, err := http.NewRequest("GET", endpointURL, nil)
if err != nil {
b.logger.Errorf("Healthcheck request %s failed for %s: %s", b.id, endpointURL, err)
continue
}
res, err := client.Do(req)

v.healthy = err == nil
if err != nil {
b.logger.Errorf("Healthcheck %s failed for %s: %s", b.id, endpointURL, err)
} else {
v.healthy = res.StatusCode < http.StatusInternalServerError // todo customize status code

if !v.healthy {
b.logger.Errorf("Endpoint %s is not healthy (%d). ID: %s", endpointURL, res.StatusCode, b.id)
}
}
// v.healthy = true

b.m.Lock()
b.items[k] = v
b.m.Unlock()
}

}
}()
}
10 changes: 5 additions & 5 deletions server/balancer/roundrobin/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func initLogs() {
func TestPickEmpty(t *testing.T) {
initLogs()

b := roundrobin.New([]string{})
b := roundrobin.New("TestPickEmpty", []string{}, false)

value, err := b.Pick()

Expand All @@ -48,11 +48,11 @@ func TestPickEmpty(t *testing.T) {
func TestPickWithData(t *testing.T) {
initLogs()

b := roundrobin.New([]string{
b := roundrobin.New("TestPickWithData", []string{
"item1",
"item2",
"item3",
})
}, false)

value, err := b.Pick()

Expand All @@ -66,11 +66,11 @@ func TestPickWithData(t *testing.T) {
func TestPickCorrectness(t *testing.T) {
initLogs()

b := roundrobin.New([]string{
b := roundrobin.New("TestPickCorrectness", []string{
"item1",
"item2",
"item3",
})
}, false)

// first round (shuffling)
var value1, value2, value3, value4 interface{}
Expand Down
2 changes: 1 addition & 1 deletion server/handler/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"

"github.com/fabiocicerchia/go-proxy-cache/cache/engine"
"github.com/fabiocicerchia/go-proxy-cache/config"
Expand Down
6 changes: 1 addition & 5 deletions server/handler/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/fabiocicerchia/go-proxy-cache/server/response"
"github.com/fabiocicerchia/go-proxy-cache/server/storage"
"github.com/fabiocicerchia/go-proxy-cache/server/transport"
"github.com/fabiocicerchia/go-proxy-cache/utils/queue"
)

// CacheStatusHit - Value for HIT.
Expand Down Expand Up @@ -158,10 +157,7 @@ func (rc RequestCall) storeResponse() {
}

rc.GetLogger().Debugf("Async Store Response: %s", rc.Request.URL.String())
// go rc.doStoreResponse()
queue.Dispatcher.Do(func() {
doStoreResponse(rcDTO, rc.DomainConfig.Cache)
})
go doStoreResponse(rcDTO, rc.DomainConfig.Cache)
}

func doStoreResponse(rcDTO storage.RequestCallDTO, configCache config.Cache) {
Expand Down
11 changes: 3 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/fabiocicerchia/go-proxy-cache/server/logger"
srvtls "github.com/fabiocicerchia/go-proxy-cache/server/tls"
circuitbreaker "github.com/fabiocicerchia/go-proxy-cache/utils/circuit-breaker"
"github.com/fabiocicerchia/go-proxy-cache/utils/queue"
)

const enableTimeoutHandler = true
Expand All @@ -45,6 +44,8 @@ type Servers struct {
HTTPS map[string]Server
}

var servers *Servers

// Run - Starts the GoProxyCache servers' listeners.
func Run(configFile string) {
log.Infof("Starting...\n")
Expand All @@ -53,7 +54,7 @@ func Run(configFile string) {
config.InitConfigFromFileOrEnv(configFile)
config.Print()

servers := &Servers{
servers = &Servers{
HTTP: make(map[string]Server),
HTTPS: make(map[string]Server),
}
Expand All @@ -62,9 +63,6 @@ func Run(configFile string) {
servers.StartDomainServer(domain.Host, domain.Scheme)
}

// init queue
queue.Init()

// start server http & https
servers.startListeners()

Expand All @@ -81,9 +79,6 @@ func Run(configFile string) {
ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeoutShutdown)
defer cancel()

log.Error("Finishing processing queue...")
queue.Init()

log.Error("Shutting down servers...")
servers.shutdownServers(ctx)

Expand Down
35 changes: 0 additions & 35 deletions utils/queue/queue.go

This file was deleted.

0 comments on commit ce027fb

Please sign in to comment.