Skip to content

Commit

Permalink
feat: provide a way to configure logger for the loadbalancer
Browse files Browse the repository at this point in the history
It was using `log.Printf` by default which was spamming stdout in Sfyra.

Default is still to use same writer as `log` defaults.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed Sep 18, 2020
1 parent da8e987 commit 3c8f347
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
43 changes: 31 additions & 12 deletions loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,22 @@ import (
type TCP struct {
tcpproxy.Proxy

Logger *log.Logger

routes map[string]*upstream.List
}

type lbUpstream string
type lbUpstream struct {
upstream string
logger *log.Logger
}

func (upstream lbUpstream) HealthCheck(ctx context.Context) error {
d := net.Dialer{}

c, err := d.DialContext(ctx, "tcp", string(upstream))
c, err := d.DialContext(ctx, "tcp", upstream.upstream)
if err != nil {
log.Printf("healthcheck failed for %q: %s", string(upstream), err)
upstream.logger.Printf("healthcheck failed for %q: %s", upstream.upstream, err)

return err
}
Expand All @@ -46,27 +51,28 @@ func (upstream lbUpstream) HealthCheck(ctx context.Context) error {
}

type lbTarget struct {
list *upstream.List
list *upstream.List
logger *log.Logger
}

func (target *lbTarget) HandleConn(conn net.Conn) {
upstreamBackend, err := target.list.Pick()
if err != nil {
log.Printf("no upstreams available, closing connection from %s", conn.RemoteAddr())
target.logger.Printf("no upstreams available, closing connection from %s", conn.RemoteAddr())
conn.Close() //nolint: errcheck

return
}

upstreamAddr := upstreamBackend.(lbUpstream) //nolint: errcheck
upstream := upstreamBackend.(lbUpstream) //nolint: errcheck

log.Printf("proxying connection %s -> %s", conn.RemoteAddr(), string(upstreamAddr))
target.logger.Printf("proxying connection %s -> %s", conn.RemoteAddr(), upstream.upstream)

upstreamTarget := tcpproxy.To(string(upstreamAddr))
upstreamTarget := tcpproxy.To(upstream.upstream)
upstreamTarget.OnDialError = func(src net.Conn, dstDialErr error) {
src.Close() //nolint: errcheck

log.Printf("error dialing upstream %s: %s", string(upstreamAddr), dstDialErr)
target.logger.Printf("error dialing upstream %s: %s", upstream.upstream, dstDialErr)

target.list.Down(upstreamBackend)
}
Expand All @@ -79,13 +85,20 @@ func (target *lbTarget) HandleConn(conn net.Conn) {
// TCP automatically does background health checks for the upstreams and picks only healthy
// ones. Healthcheck is simple Dial attempt.
func (t *TCP) AddRoute(ipPort string, upstreamAddrs []string, options ...upstream.ListOption) error {
if t.Logger == nil {
t.Logger = log.New(log.Writer(), "", log.Flags())
}

if t.routes == nil {
t.routes = make(map[string]*upstream.List)
}

upstreams := make([]upstream.Backend, len(upstreamAddrs))
for i := range upstreams {
upstreams[i] = lbUpstream(upstreamAddrs[i])
upstreams[i] = lbUpstream{
upstream: upstreamAddrs[i],
logger: t.Logger,
}
}

list, err := upstream.NewList(upstreams, options...)
Expand All @@ -95,7 +108,10 @@ func (t *TCP) AddRoute(ipPort string, upstreamAddrs []string, options ...upstrea

t.routes[ipPort] = list

t.Proxy.AddRoute(ipPort, &lbTarget{list: list})
t.Proxy.AddRoute(ipPort, &lbTarget{
list: list,
logger: t.Logger,
})

return nil
}
Expand All @@ -113,7 +129,10 @@ func (t *TCP) ReconcileRoute(ipPort string, upstreamAddrs []string) error {

upstreams := make([]upstream.Backend, len(upstreamAddrs))
for i := range upstreams {
upstreams[i] = lbUpstream(upstreamAddrs[i])
upstreams[i] = lbUpstream{
upstream: upstreamAddrs[i],
logger: t.Logger,
}
}

list.Reconcile(upstreams)
Expand Down
2 changes: 2 additions & 0 deletions loadbalancer/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func (suite *TCPSuite) TestReconcile() {
no, err := strconv.ParseInt(string(id), 10, 32)
suite.Require().NoError(err)

suite.Assert().EqualValues(no, pivot+(i+pivot)%(upstreamCount-pivot))

suite.Assert().Less(no, int64(upstreamCount))
suite.Assert().GreaterOrEqual(no, int64(pivot))
upstreamsUsed[no]++
Expand Down
8 changes: 7 additions & 1 deletion upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,13 @@ func (list *List) Reconcile(upstreams []Backend) {
i--
}

for upstream := range newUpstreams {
// make insert order predictable by going over the list once again,
// as iterating over the map might lead to unpredictable order
for _, upstream := range upstreams {
if _, ok := newUpstreams[upstream]; !ok {
continue
}

list.nodes = append(list.nodes, node{
backend: upstream,
score: list.initialScore,
Expand Down

0 comments on commit 3c8f347

Please sign in to comment.