diff --git a/Makefile b/Makefile index 2768b58..104652a 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2020-09-01T20:34:11Z by kres ee90a80-dirty. +# Generated on 2020-09-09T15:36:30Z by kres 7e146df-dirty. # common variables @@ -34,7 +34,7 @@ COMMON_ARGS += --build-arg=USERNAME=$(USERNAME) COMMON_ARGS += --build-arg=TOOLCHAIN=$(TOOLCHAIN) COMMON_ARGS += --build-arg=GOFUMPT_VERSION=$(GOFUMPT_VERSION) COMMON_ARGS += --build-arg=TESTPKGS=$(TESTPKGS) -TOOLCHAIN ?= docker.io/golang:1.14-alpine +TOOLCHAIN ?= docker.io/golang:1.15-alpine # help menu diff --git a/hack/git-chglog/config.yaml b/hack/git-chglog/config.yaml index dbb9735..04dc293 100644 --- a/hack/git-chglog/config.yaml +++ b/hack/git-chglog/config.yaml @@ -1,12 +1,12 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2020-09-01T20:34:11Z by kres ee90a80-dirty. +# Generated on 2020-09-09T15:36:30Z by kres 7e146df-dirty. style: github template: CHANGELOG.tpl.md info: title: CHANGELOG - repository_url: https://github.com/talos-systems/talos + repository_url: https://github.com/talos-systems/go-loadbalancer options: commits: # filters: diff --git a/loadbalancer/loadbalancer.go b/loadbalancer/loadbalancer.go index ebf012e..6f6b7ba 100644 --- a/loadbalancer/loadbalancer.go +++ b/loadbalancer/loadbalancer.go @@ -7,6 +7,7 @@ package loadbalancer import ( "context" + "fmt" "log" "net" @@ -25,6 +26,8 @@ import ( // Usage: call Run() to start lb and wait for shutdown, call Close() to shutdown lb. type TCP struct { tcpproxy.Proxy + + routes map[string]*upstream.List } type lbUpstream string @@ -76,6 +79,10 @@ func (target *lbTarget) HandleConn(conn net.Conn) { // TCP automatically does background health checks for the upstreams and picks only healthy // ones. Healthcheck is simple Dial attempt. func (t *TCP) AddRoute(ipPort string, upstreamAddrs []string, options ...upstream.ListOption) error { + if t.routes == nil { + t.routes = make(map[string]*upstream.List) + } + upstreams := make([]upstream.Backend, len(upstreamAddrs)) for i := range upstreams { upstreams[i] = lbUpstream(upstreamAddrs[i]) @@ -86,7 +93,30 @@ func (t *TCP) AddRoute(ipPort string, upstreamAddrs []string, options ...upstrea return err } + t.routes[ipPort] = list + t.Proxy.AddRoute(ipPort, &lbTarget{list: list}) return nil } + +// ReconcileRoute updates the list of upstreamAddrs for the specified route (ipPort). +func (t *TCP) ReconcileRoute(ipPort string, upstreamAddrs []string) error { + if t.routes == nil { + t.routes = make(map[string]*upstream.List) + } + + list := t.routes[ipPort] + if list == nil { + return fmt.Errorf("handler not registered for %q", ipPort) + } + + upstreams := make([]upstream.Backend, len(upstreamAddrs)) + for i := range upstreams { + upstreams[i] = lbUpstream(upstreamAddrs[i]) + } + + list.Reconcile(upstreams) + + return nil +} diff --git a/loadbalancer/loadbalancer_test.go b/loadbalancer/loadbalancer_test.go index ffb7ba3..606c7e3 100644 --- a/loadbalancer/loadbalancer_test.go +++ b/loadbalancer/loadbalancer_test.go @@ -72,6 +72,102 @@ type TCPSuite struct { suite.Suite } +func (suite *TCPSuite) TestReconcile() { + const ( + upstreamCount = 5 + pivot = 2 + ) + + upstreams := make([]mockUpstream, upstreamCount) + for i := range upstreams { + upstreams[i].identity = strconv.Itoa(i) + suite.Require().NoError(upstreams[i].Start()) + } + + upstreamAddrs := make([]string, len(upstreams)) + for i := range upstreamAddrs { + upstreamAddrs[i] = upstreams[i].addr + } + + listenAddr, err := findListenAddress() + suite.Require().NoError(err) + + lb := &loadbalancer.TCP{} + suite.Require().NoError(lb.AddRoute( + listenAddr, + upstreamAddrs[:pivot], + upstream.WithLowHighScores(-3, 3), + upstream.WithInitialScore(1), + upstream.WithScoreDeltas(-1, 1), + upstream.WithHealthcheckInterval(time.Second), + upstream.WithHealthcheckTimeout(100*time.Millisecond), + )) + + suite.Require().NoError(lb.Start()) + + var wg sync.WaitGroup + + wg.Add(1) + + go func() { + defer wg.Done() + + lb.Wait() //nolint: errcheck + }() + + for i := 0; i < 5*pivot; i++ { + c, err := net.Dial("tcp", listenAddr) + suite.Require().NoError(err) + + id, err := ioutil.ReadAll(c) + suite.Require().NoError(err) + + // load balancer should go round-robin across all the upstreams [0:pivot] + suite.Assert().Equal([]byte(strconv.Itoa(i%pivot)), id) + + suite.Require().NoError(c.Close()) + } + + // reconcile the list + suite.Require().NoError(lb.ReconcileRoute(listenAddr, upstreamAddrs[pivot:])) + + // bring down pre-pivot upstreams + for i := 0; i < pivot; i++ { + upstreams[i].Close() + } + + upstreamsUsed := map[int64]int{} + + for i := 0; i < 10*(upstreamCount-pivot); i++ { + c, err := net.Dial("tcp", listenAddr) + suite.Require().NoError(err) + + id, err := ioutil.ReadAll(c) + suite.Require().NoError(err) + + // load balancer should go round-robin across all the upstreams [pivot:] + no, err := strconv.ParseInt(string(id), 10, 32) + suite.Require().NoError(err) + + suite.Assert().Less(no, int64(upstreamCount)) + suite.Assert().GreaterOrEqual(no, int64(pivot)) + upstreamsUsed[no]++ + + suite.Require().NoError(c.Close()) + } + + for _, count := range upstreamsUsed { + suite.Assert().Equal(10, count) + } + + suite.Require().NoError(lb.Close()) + wg.Wait() + + for i := range upstreams { + upstreams[i].Close() + } +} + func (suite *TCPSuite) TestBalancer() { const ( upstreamCount = 5 diff --git a/upstream/upstream.go b/upstream/upstream.go index 07e2e67..7d8e773 100644 --- a/upstream/upstream.go +++ b/upstream/upstream.go @@ -159,6 +159,39 @@ func NewList(upstreams []Backend, options ...ListOption) (*List, error) { return list, nil } +// Reconcile the list of backends with passed list. +// +// Any new backends are added with initial score, score is untouched +// for backends which haven't changed their score. +func (list *List) Reconcile(upstreams []Backend) { + newUpstreams := make(map[Backend]struct{}, len(upstreams)) + + for _, upstream := range upstreams { + newUpstreams[upstream] = struct{}{} + } + + list.mu.Lock() + defer list.mu.Unlock() + + for i := 0; i < len(list.nodes); i++ { + if _, exists := newUpstreams[list.nodes[i].backend]; exists { + delete(newUpstreams, list.nodes[i].backend) + + continue + } + + list.nodes = append(list.nodes[:i], list.nodes[i+1:]...) + i-- + } + + for upstream := range newUpstreams { + list.nodes = append(list.nodes, node{ + backend: upstream, + score: list.initialScore, + }) + } +} + // Shutdown stops healthchecks. func (list *List) Shutdown() { list.healthCtxCancel() diff --git a/upstream/upstream_test.go b/upstream/upstream_test.go index f9c7d00..ad5ff66 100644 --- a/upstream/upstream_test.go +++ b/upstream/upstream_test.go @@ -187,6 +187,98 @@ func (suite *ListSuite) TestHealthcheck() { })) } +func (suite *ListSuite) TestReconcile() { + l, err := upstream.NewList( + []upstream.Backend{ + mockBackend("one"), + mockBackend("two"), + mockBackend("three"), + }, + upstream.WithLowHighScores(-3, 3), + upstream.WithInitialScore(1), + upstream.WithScoreDeltas(-1, 1), + upstream.WithHealthcheckInterval(time.Hour), + ) + suite.Require().NoError(err) + + defer l.Shutdown() + + backend, err := l.Pick() + suite.Assert().Equal(mockBackend("one"), backend) + suite.Assert().NoError(err) + + l.Reconcile([]upstream.Backend{ + mockBackend("one"), + mockBackend("two"), + mockBackend("three"), + }) + + backend, err = l.Pick() + suite.Assert().Equal(mockBackend("two"), backend) + suite.Assert().NoError(err) + + l.Reconcile([]upstream.Backend{ + mockBackend("one"), + mockBackend("two"), + mockBackend("four"), + }) + + backend, err = l.Pick() + suite.Assert().Equal(mockBackend("four"), backend) + suite.Assert().NoError(err) + + l.Reconcile([]upstream.Backend{ + mockBackend("five"), + mockBackend("six"), + mockBackend("four"), + }) + + backend, err = l.Pick() + suite.Assert().Equal(mockBackend("four"), backend) + suite.Assert().NoError(err) + + backend, err = l.Pick() + suite.Assert().Equal(mockBackend("five"), backend) + suite.Assert().NoError(err) + + backend, err = l.Pick() + suite.Assert().Equal(mockBackend("six"), backend) + suite.Assert().NoError(err) + + l.Down(mockBackend("four")) // score == 2 + l.Down(mockBackend("four")) // score == 1 + l.Down(mockBackend("four")) // score == 0 + l.Down(mockBackend("four")) // score == -1 + + backend, err = l.Pick() + suite.Assert().Equal(mockBackend("five"), backend) + suite.Assert().NoError(err) + + backend, err = l.Pick() + suite.Assert().Equal(mockBackend("six"), backend) + suite.Assert().NoError(err) + + l.Reconcile([]upstream.Backend{ + mockBackend("five"), + mockBackend("six"), + mockBackend("four"), + }) + + backend, err = l.Pick() + suite.Assert().Equal(mockBackend("five"), backend) + suite.Assert().NoError(err) + + backend, err = l.Pick() + suite.Assert().Equal(mockBackend("six"), backend) + suite.Assert().NoError(err) + + l.Reconcile(nil) + + backend, err = l.Pick() + suite.Assert().Nil(backend) + suite.Assert().EqualError(err, "no upstreams available") +} + func TestListSuite(t *testing.T) { suite.Run(t, new(ListSuite)) }