Skip to content

Commit

Permalink
Automatically remove target after persistent failures
Browse files Browse the repository at this point in the history
  • Loading branch information
rb3ckers committed Feb 19, 2019
1 parent 5a9e8d5 commit 7f2b6bc
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 38 deletions.
101 changes: 84 additions & 17 deletions datatypes/MirrorTargets.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,107 @@
package datatypes

import "sync"
import (
"log"
"sync"
"time"

"github.com/sony/gobreaker"
)

// MirrorTargets has the set of target URLs to mirror to
// It protects these for multi threaded access
type MirrorTargets struct {
sync.RWMutex
targets map[string]bool
targets map[string]*targetState
persistentFailureTimeout time.Duration
retryAfter time.Duration
}

type MirrorSettings struct {
PersistentFailureTimeout time.Duration
RetryAfter time.Duration
}

func NewMirrorTargets() *MirrorTargets {
type targetState struct {
sync.Mutex
firstFailure time.Time
persistentFailureTimeout time.Duration
circuitBreaker *gobreaker.CircuitBreaker
onTargetFailed func(target string)
}

func NewMirrorTargets(settings MirrorSettings) *MirrorTargets {
return &MirrorTargets{
targets: make(map[string]bool),
targets: make(map[string]*targetState),
persistentFailureTimeout: settings.PersistentFailureTimeout,
retryAfter: settings.RetryAfter,
}
}

func (me *MirrorTargets) Add(targets []string) {
me.Lock()
func (mt *MirrorTargets) Add(targets []string) {
log.Printf("Adding %s to targets list.", targets)
mt.Lock()
defer mt.Unlock()
for _, url := range targets {
me.targets[url] = true
mt.targets[url] = newTargetState(url, mt.persistentFailureTimeout, mt.retryAfter, func(target string) {
mt.Delete([]string{target})
})
}
me.Unlock()
}

func (me *MirrorTargets) Delete(targets []string) {
me.Lock()
func (mt *MirrorTargets) Delete(targets []string) {
log.Printf("Removing %s from targets list.", targets)
mt.Lock()
defer mt.Unlock()
for _, url := range targets {
delete(me.targets, url)
delete(mt.targets, url)
}
}

func (mt *MirrorTargets) ForEach(f func(string, *gobreaker.CircuitBreaker)) {
mt.RLock()
defer mt.RUnlock()
for url, target := range mt.targets {
f(url, target.circuitBreaker)
}
}

func (ts *targetState) onBreakerChange(name string, from gobreaker.State, to gobreaker.State) {
if from == gobreaker.StateClosed && to == gobreaker.StateOpen {
ts.Lock()
defer ts.Unlock()
ts.firstFailure = time.Now()
log.Printf("Temporary not mirroring to target %s.", name)
} else if to == gobreaker.StateOpen {
ts.Lock()
defer ts.Unlock()
if !ts.firstFailure.IsZero() && time.Now().Sub(ts.firstFailure) > ts.persistentFailureTimeout {
log.Printf("%s is persistently failing.", name)
ts.onTargetFailed(name)
}
} else if to == gobreaker.StateHalfOpen {
log.Printf("Retrying target %s.", name)
} else if to == gobreaker.StateClosed {
ts.Lock()
defer ts.Unlock()
ts.firstFailure = time.Time{}
log.Printf("Resuming mirroring to target %s.", name)
}
me.Unlock()
}

func (me *MirrorTargets) ForEach(f func(string)) {
me.RLock()
for url := range me.targets {
f(url)
func newTargetState(name string, persistentFailureTimeout time.Duration, retryAfter time.Duration, onTargetFailed func(target string)) *targetState {
targetState := &targetState{
circuitBreaker: nil,
onTargetFailed: onTargetFailed,
persistentFailureTimeout: persistentFailureTimeout,
}
settings := gobreaker.Settings{
Name: name,
MaxRequests: 1,
Interval: 0, // Never clear counts
Timeout: retryAfter, // When open retry after 60 seconds
OnStateChange: targetState.onBreakerChange,
}
me.RUnlock()
targetState.circuitBreaker = gobreaker.NewCircuitBreaker(settings)
return targetState
}
59 changes: 38 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,25 @@ import (
"strings"
"time"

"github.com/sony/gobreaker"

"github.com/rb3ckers/trafficmirror/datatypes"
)

var netClient = &http.Client{
Timeout: time.Second * 20,
}

var targets = datatypes.NewMirrorTargets()
var targets *datatypes.MirrorTargets

func main() {
listenAddress := flag.String("listen", ":8080", "Address to listen on and mirror traffic from")
proxyTarget := flag.String("main", "http://localhost:8888", "Main proxy target, its responses will be returned to the client")
targetsEndpoint := flag.String("targets", "targets", "Path on which additional targets to mirror to can be added/deleted/listed via PUT, DELETE and GET")
targetsAddress := flag.String("targetsAddress", "", "Address on which the targets endpoint is made available. Leave empty to expose it on the address that is being mirrored")
targetsAddress := flag.String("targets-address", "", "Address on which the targets endpoint is made available. Leave empty to expose it on the address that is being mirrored")
passwordFile := flag.String("password", "", "Provide a file that contains username/password to protect the configuration 'targets' endpoint. Contains 1 username/password combination separated by ':'.")
persistentFailureTimeout := flag.Duration("fail-after", time.Minute*30, "Remove a target when it has been failing for this duration.")
retryAfter := flag.Duration("retry-after", time.Minute*1, "After 5 successive failures a target is temporarily disabled, it will be retried after this timeout.")

help := flag.Bool("help", false, "Print help")

Expand All @@ -46,12 +50,24 @@ func main() {
}

fmt.Printf("Mirroring traffic from %s to %s\n", *listenAddress, *proxyTarget)
var targetsText string
if *targetsAddress != "" {
fmt.Printf("Add/remove/list mirror targets at http://%s/%s\n", *targetsAddress, *targetsEndpoint)
targetsText = fmt.Sprintf("http://%s/%s", *targetsAddress, *targetsEndpoint)
} else {
fmt.Printf("Add/remove/list mirror targets at http://%s/%s\n", *listenAddress, *targetsEndpoint)
targetsText = fmt.Sprintf("http://%s/%s", *listenAddress, *targetsEndpoint)
}

fmt.Printf("Add/remove/list mirror targets via PUT/DELETE/GET at %s:\n", targetsText)
fmt.Printf("List : curl %s\n", targetsText)
fmt.Printf("Add : curl -X PUT %s?url=http://localhost:5678\n", targetsText)
fmt.Printf("Remove: curl -X DELETE %s?url=http://localhost:5678\n", targetsText)
fmt.Println()

targets = datatypes.NewMirrorTargets(datatypes.MirrorSettings{
PersistentFailureTimeout: *persistentFailureTimeout,
RetryAfter: *retryAfter,
})

url, _ := url.Parse(*proxyTarget)

mirrorMux := http.NewServeMux()
Expand Down Expand Up @@ -108,30 +124,33 @@ func bufferRequest(req *http.Request) []byte {
}

func sendToMirrors(req *http.Request, body []byte) {
targets.ForEach(func(target string) {
go mirrorTo(target, req, body)
targets.ForEach(func(target string, breaker *gobreaker.CircuitBreaker) {
go mirrorTo(target, req, body, breaker)
})
}

func mirrorTo(targetURL string, req *http.Request, body []byte) {
url := fmt.Sprintf("%s%s", targetURL, req.RequestURI)
func mirrorTo(targetURL string, req *http.Request, body []byte, breaker *gobreaker.CircuitBreaker) {
breaker.Execute(func() (interface{}, error) {
url := fmt.Sprintf("%s%s", targetURL, req.RequestURI)

newRequest, _ := http.NewRequest(req.Method, url, bytes.NewReader(body))
newRequest.Header = req.Header
newRequest, _ := http.NewRequest(req.Method, url, bytes.NewReader(body))
newRequest.Header = req.Header

response, err := netClient.Do(newRequest)
if err != nil {
log.Printf("Error reading response: %v", err)
return
}
defer response.Body.Close()
// Drain the body, but discard it, to make sure connection can be reused
io.Copy(ioutil.Discard, response.Body)
response, err := netClient.Do(newRequest)
if err != nil {
log.Printf("Error reading response: %v", err)
return nil, err
}
defer response.Body.Close()
// Drain the body, but discard it, to make sure connection can be reused
io.Copy(ioutil.Discard, response.Body)
return nil, nil
})
}

func mirrorsHandler(res http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodGet {
targets.ForEach(func(target string) {
targets.ForEach(func(target string, breaker *gobreaker.CircuitBreaker) {
fmt.Fprintln(res, target)
})
return
Expand All @@ -147,10 +166,8 @@ func mirrorsHandler(res http.ResponseWriter, req *http.Request) {
}

if req.Method == http.MethodPut {
log.Printf("Adding '%s' to targets list.", targetURLs)
targets.Add(targetURLs)
} else if req.Method == http.MethodDelete {
log.Printf("Removing '%s' from targets list.", targetURLs)
targets.Delete(targetURLs)
}
}
Expand Down

0 comments on commit 7f2b6bc

Please sign in to comment.