Skip to content

Commit

Permalink
Connect: Fix Envoy getting stuck during load
Browse files Browse the repository at this point in the history
Also in this PR:
 - Enabled outlier detection on upstreams which will mark instances unhealthy after 5 failures (using Envoy's defaults)
 - Enable weighted load balancing where DNS weights are configured
  • Loading branch information
banks committed Mar 15, 2019
1 parent 33d0922 commit a6af4f6
Show file tree
Hide file tree
Showing 8 changed files with 344 additions and 5 deletions.
21 changes: 20 additions & 1 deletion agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -101,9 +103,24 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap
}

if c == nil {
conTimeout := 5 * time.Second
if toRaw, ok := upstream.Config["connect_timeout_ms"]; ok {
switch v := toRaw.(type) {
case string:
if ms, err := strconv.Atoi(v); err == nil {
conTimeout = time.Duration(ms) * time.Millisecond
}
case float64: // This is what parsing from JSON results in
conTimeout = time.Duration(v) * time.Millisecond
// Not sure if this can ever really happen but just in case it does in
// some test code...
case int:
conTimeout = time.Duration(v) * time.Millisecond
}
}
c = &envoy.Cluster{
Name: upstream.Identifier(),
ConnectTimeout: 5 * time.Second,
ConnectTimeout: conTimeout,
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
Expand All @@ -112,6 +129,8 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap
},
},
},
// Having an empty config enables outlier detection with default config.
OutlierDetection: &cluster.OutlierDetection{},
}
}

Expand Down
76 changes: 76 additions & 0 deletions agent/xds/clusters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package xds

import (
"testing"
"time"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)

func Test_makeUpstreamCluster(t *testing.T) {
tests := []struct {
name string
snap proxycfg.ConfigSnapshot
upstream structs.Upstream
want *envoy.Cluster
}{
{
name: "timeout override",
snap: proxycfg.ConfigSnapshot{},
upstream: structs.TestUpstreams(t)[0],
want: &envoy.Cluster{
Name: "service:db",
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
ConnectTimeout: 1 * time.Second, // TestUpstreams overrides to 1000ms
OutlierDetection: &cluster.OutlierDetection{},
TlsContext: &envoyauth.UpstreamTlsContext{
CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}),
},
},
},
{
name: "timeout default",
snap: proxycfg.ConfigSnapshot{},
upstream: structs.TestUpstreams(t)[1],
want: &envoy.Cluster{
Name: "prepared_query:geo-cache",
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
ConnectTimeout: 5 * time.Second, // Default
OutlierDetection: &cluster.OutlierDetection{},
TlsContext: &envoyauth.UpstreamTlsContext{
CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
got, err := makeUpstreamCluster(tt.upstream, &tt.snap)
require.NoError(err)

require.Equal(tt.want, got)
})
}
}
28 changes: 28 additions & 0 deletions agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/gogo/protobuf/proto"

Expand Down Expand Up @@ -43,10 +44,37 @@ func makeLoadAssignment(clusterName string, endpoints structs.CheckServiceNodes)
if addr == "" {
addr = ep.Node.Address
}
healthStatus := core.HealthStatus_HEALTHY
weight := 1
if ep.Service.Weights != nil {
weight = ep.Service.Weights.Passing
}
for _, chk := range ep.Checks {
if chk.Status == "critical" {
// This can't actually happen now because health always filters critical
// but in the future it may not so set this correctly!
healthStatus = core.HealthStatus_UNHEALTHY
}
if chk.Status == "warning" && ep.Service.Weights != nil {
weight = ep.Service.Weights.Warning
}
}
// Make weights fit Envoy's limits. A zero weight means that either Warning
// (likely) or Passing (weirdly) weight has been set to 0 effectively making
// this instance unhealthy and should not be sent traffic.
if weight < 1 {
healthStatus = core.HealthStatus_UNHEALTHY
weight = 1
}
if weight > 128 {
weight = 128
}
es = append(es, envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr(addr, ep.Service.Port),
},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),
})
}
return &envoy.ClusterLoadAssignment{
Expand Down
193 changes: 193 additions & 0 deletions agent/xds/endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package xds

import (
"testing"

"github.com/mitchellh/copystructure"

"github.com/stretchr/testify/require"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/hashicorp/consul/agent/structs"
)

func Test_makeLoadAssignment(t *testing.T) {

testCheckServiceNodes := structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "node1-id",
Node: "node1",
Address: "10.10.10.10",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: "web",
Port: 1234,
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node1",
CheckID: "serfHealth",
Status: "passing",
},
&structs.HealthCheck{
Node: "node1",
ServiceID: "web",
CheckID: "web:check",
Status: "passing",
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "node2-id",
Node: "node2",
Address: "10.10.10.20",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: "web",
Port: 1234,
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node2",
CheckID: "serfHealth",
Status: "passing",
},
&structs.HealthCheck{
Node: "node2",
ServiceID: "web",
CheckID: "web:check",
Status: "passing",
},
},
},
}

testWeightedCheckServiceNodesRaw, err := copystructure.Copy(testCheckServiceNodes)
require.NoError(t, err)
testWeightedCheckServiceNodes := testWeightedCheckServiceNodesRaw.(structs.CheckServiceNodes)

testWeightedCheckServiceNodes[0].Service.Weights = &structs.Weights{
Passing: 10,
Warning: 1,
}
testWeightedCheckServiceNodes[1].Service.Weights = &structs.Weights{
Passing: 5,
Warning: 0,
}

testWarningCheckServiceNodesRaw, err := copystructure.Copy(testWeightedCheckServiceNodes)
require.NoError(t, err)
testWarningCheckServiceNodes := testWarningCheckServiceNodesRaw.(structs.CheckServiceNodes)

testWarningCheckServiceNodes[0].Checks[0].Status = "warning"
testWarningCheckServiceNodes[1].Checks[0].Status = "warning"

tests := []struct {
name string
clusterName string
endpoints structs.CheckServiceNodes
want *envoy.ClusterLoadAssignment
}{
{
name: "no instances",
clusterName: "service:test",
endpoints: structs.CheckServiceNodes{},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{},
}},
},
},
{
name: "instances, no weights",
clusterName: "service:test",
endpoints: testCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
},
}},
},
},
{
name: "instances, healthy weights",
clusterName: "service:test",
endpoints: testWeightedCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(10),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(5),
},
},
}},
},
},
{
name: "instances, warning weights",
clusterName: "service:test",
endpoints: testWarningCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_UNHEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
},
}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := makeLoadAssignment(tt.clusterName, tt.endpoints)
require.Equal(t, tt.want, got)
})
}
}
3 changes: 3 additions & 0 deletions agent/xds/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ func makeCommonTLSContext(cfgSnap *proxycfg.ConfigSnapshot) *envoyauth.CommonTls
// Concatenate all the root PEMs into one.
// TODO(banks): verify this actually works with Envoy (docs are not clear).
rootPEMS := ""
if cfgSnap.Roots == nil {
return nil
}
for _, root := range cfgSnap.Roots.Roots {
rootPEMS += root.RootCert
}
Expand Down
5 changes: 5 additions & 0 deletions agent/xds/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
prototypes "github.com/gogo/protobuf/types"
)

func createResponse(typeURL string, version, nonce string, resources []proto.Message) (*envoy.DiscoveryResponse, error) {
Expand Down Expand Up @@ -52,3 +53,7 @@ func makeAddressPtr(ip string, port int) *envoycore.Address {
a := makeAddress(ip, port)
return &a
}

func makeUint32Value(n int) *prototypes.UInt32Value {
return &prototypes.UInt32Value{Value: uint32(n)}
}
Loading

0 comments on commit a6af4f6

Please sign in to comment.