Skip to content

Commit

Permalink
Connect: Fix Envoy getting stuck during load (#5499)
Browse files Browse the repository at this point in the history
* Connect: Fix Envoy getting stuck during load

Also in this PR:
 - Enabled outlier detection on upstreams which will mark instances unhealthy after 5 failures (using Envoy's defaults)
 - Enable weighted load balancing where DNS weights are configured

* Fix empty load assignments in the right place

* Fix import names from review

* Move millisecond parse to a helper function
  • Loading branch information
banks authored Mar 22, 2019
1 parent 6a78e2a commit 89fa5ec
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 8 deletions.
31 changes: 30 additions & 1 deletion agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
envoycluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -86,6 +88,25 @@ func makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
return c, err
}

func parseTimeMillis(ms interface{}) (time.Duration, error) {
switch v := ms.(type) {
case string:
ms, err := strconv.Atoi(v)
if err != nil {
return 0, err
}
return time.Duration(ms) * time.Millisecond, nil

case float64: // This is what parsing from JSON results in
return time.Duration(v) * time.Millisecond, nil
// Not sure if this can ever really happen but just in case it does in
// some test code...
case int:
return time.Duration(v) * time.Millisecond, nil
}
return 0, errors.New("invalid type for millisecond duration")
}

func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
var c *envoy.Cluster
var err error
Expand All @@ -101,9 +122,15 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap
}

if c == nil {
conTimeout := 5 * time.Second
if toRaw, ok := upstream.Config["connect_timeout_ms"]; ok {
if ms, err := parseTimeMillis(toRaw); err == nil {
conTimeout = ms
}
}
c = &envoy.Cluster{
Name: upstream.Identifier(),
ConnectTimeout: 5 * time.Second,
ConnectTimeout: conTimeout,
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
Expand All @@ -112,6 +139,8 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap
},
},
},
// Having an empty config enables outlier detection with default config.
OutlierDetection: &envoycluster.OutlierDetection{},
}
}

Expand Down
76 changes: 76 additions & 0 deletions agent/xds/clusters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package xds

import (
"testing"
"time"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)

func Test_makeUpstreamCluster(t *testing.T) {
tests := []struct {
name string
snap proxycfg.ConfigSnapshot
upstream structs.Upstream
want *envoy.Cluster
}{
{
name: "timeout override",
snap: proxycfg.ConfigSnapshot{},
upstream: structs.TestUpstreams(t)[0],
want: &envoy.Cluster{
Name: "service:db",
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
ConnectTimeout: 1 * time.Second, // TestUpstreams overrides to 1000ms
OutlierDetection: &cluster.OutlierDetection{},
TlsContext: &envoyauth.UpstreamTlsContext{
CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}),
},
},
},
{
name: "timeout default",
snap: proxycfg.ConfigSnapshot{},
upstream: structs.TestUpstreams(t)[1],
want: &envoy.Cluster{
Name: "prepared_query:geo-cache",
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
ConnectTimeout: 5 * time.Second, // Default
OutlierDetection: &cluster.OutlierDetection{},
TlsContext: &envoyauth.UpstreamTlsContext{
CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
got, err := makeUpstreamCluster(tt.upstream, &tt.snap)
require.NoError(err)

require.Equal(tt.want, got)
})
}
}
33 changes: 30 additions & 3 deletions agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"errors"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/gogo/protobuf/proto"

"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)

// endpointsFromSnapshot returns the xDS API representation of the "endpoints"
Expand All @@ -19,9 +21,6 @@ func endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]pr
}
resources := make([]proto.Message, 0, len(cfgSnap.UpstreamEndpoints))
for id, endpoints := range cfgSnap.UpstreamEndpoints {
if len(endpoints) < 1 {
continue
}
la := makeLoadAssignment(id, endpoints)
resources = append(resources, la)
}
Expand All @@ -43,10 +42,38 @@ func makeLoadAssignment(clusterName string, endpoints structs.CheckServiceNodes)
if addr == "" {
addr = ep.Node.Address
}
healthStatus := envoycore.HealthStatus_HEALTHY
weight := 1
if ep.Service.Weights != nil {
weight = ep.Service.Weights.Passing
}

for _, chk := range ep.Checks {
if chk.Status == api.HealthCritical {
// This can't actually happen now because health always filters critical
// but in the future it may not so set this correctly!
healthStatus = envoycore.HealthStatus_UNHEALTHY
}
if chk.Status == api.HealthWarning && ep.Service.Weights != nil {
weight = ep.Service.Weights.Warning
}
}
// Make weights fit Envoy's limits. A zero weight means that either Warning
// (likely) or Passing (weirdly) weight has been set to 0 effectively making
// this instance unhealthy and should not be sent traffic.
if weight < 1 {
healthStatus = envoycore.HealthStatus_UNHEALTHY
weight = 1
}
if weight > 128 {
weight = 128
}
es = append(es, envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr(addr, ep.Service.Port),
},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),
})
}
return &envoy.ClusterLoadAssignment{
Expand Down
193 changes: 193 additions & 0 deletions agent/xds/endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package xds

import (
"testing"

"github.com/mitchellh/copystructure"

"github.com/stretchr/testify/require"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/hashicorp/consul/agent/structs"
)

func Test_makeLoadAssignment(t *testing.T) {

testCheckServiceNodes := structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "node1-id",
Node: "node1",
Address: "10.10.10.10",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: "web",
Port: 1234,
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node1",
CheckID: "serfHealth",
Status: "passing",
},
&structs.HealthCheck{
Node: "node1",
ServiceID: "web",
CheckID: "web:check",
Status: "passing",
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "node2-id",
Node: "node2",
Address: "10.10.10.20",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: "web",
Port: 1234,
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node2",
CheckID: "serfHealth",
Status: "passing",
},
&structs.HealthCheck{
Node: "node2",
ServiceID: "web",
CheckID: "web:check",
Status: "passing",
},
},
},
}

testWeightedCheckServiceNodesRaw, err := copystructure.Copy(testCheckServiceNodes)
require.NoError(t, err)
testWeightedCheckServiceNodes := testWeightedCheckServiceNodesRaw.(structs.CheckServiceNodes)

testWeightedCheckServiceNodes[0].Service.Weights = &structs.Weights{
Passing: 10,
Warning: 1,
}
testWeightedCheckServiceNodes[1].Service.Weights = &structs.Weights{
Passing: 5,
Warning: 0,
}

testWarningCheckServiceNodesRaw, err := copystructure.Copy(testWeightedCheckServiceNodes)
require.NoError(t, err)
testWarningCheckServiceNodes := testWarningCheckServiceNodesRaw.(structs.CheckServiceNodes)

testWarningCheckServiceNodes[0].Checks[0].Status = "warning"
testWarningCheckServiceNodes[1].Checks[0].Status = "warning"

tests := []struct {
name string
clusterName string
endpoints structs.CheckServiceNodes
want *envoy.ClusterLoadAssignment
}{
{
name: "no instances",
clusterName: "service:test",
endpoints: structs.CheckServiceNodes{},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{},
}},
},
},
{
name: "instances, no weights",
clusterName: "service:test",
endpoints: testCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
},
}},
},
},
{
name: "instances, healthy weights",
clusterName: "service:test",
endpoints: testWeightedCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(10),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(5),
},
},
}},
},
},
{
name: "instances, warning weights",
clusterName: "service:test",
endpoints: testWarningCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_UNHEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
},
}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := makeLoadAssignment(tt.clusterName, tt.endpoints)
require.Equal(t, tt.want, got)
})
}
}
Loading

0 comments on commit 89fa5ec

Please sign in to comment.