Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect: Fix Envoy getting stuck during load #5499

Merged
merged 4 commits into from
Mar 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
envoycluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -86,6 +88,25 @@ func makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
return c, err
}

func parseTimeMillis(ms interface{}) (time.Duration, error) {
switch v := ms.(type) {
case string:
ms, err := strconv.Atoi(v)
if err != nil {
return 0, err
}
return time.Duration(ms) * time.Millisecond, nil

case float64: // This is what parsing from JSON results in
return time.Duration(v) * time.Millisecond, nil
// Not sure if this can ever really happen but just in case it does in
// some test code...
case int:
return time.Duration(v) * time.Millisecond, nil
}
return 0, errors.New("invalid type for millisecond duration")
}

func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
var c *envoy.Cluster
var err error
Expand All @@ -101,9 +122,15 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap
}

if c == nil {
conTimeout := 5 * time.Second
if toRaw, ok := upstream.Config["connect_timeout_ms"]; ok {
if ms, err := parseTimeMillis(toRaw); err == nil {
conTimeout = ms
}
}
c = &envoy.Cluster{
Name: upstream.Identifier(),
ConnectTimeout: 5 * time.Second,
ConnectTimeout: conTimeout,
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
Expand All @@ -112,6 +139,8 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap
},
},
},
// Having an empty config enables outlier detection with default config.
OutlierDetection: &envoycluster.OutlierDetection{},
}
}

Expand Down
76 changes: 76 additions & 0 deletions agent/xds/clusters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package xds

import (
"testing"
"time"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)

func Test_makeUpstreamCluster(t *testing.T) {
tests := []struct {
name string
snap proxycfg.ConfigSnapshot
upstream structs.Upstream
want *envoy.Cluster
}{
{
name: "timeout override",
snap: proxycfg.ConfigSnapshot{},
upstream: structs.TestUpstreams(t)[0],
want: &envoy.Cluster{
Name: "service:db",
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
ConnectTimeout: 1 * time.Second, // TestUpstreams overrides to 1000ms
OutlierDetection: &cluster.OutlierDetection{},
TlsContext: &envoyauth.UpstreamTlsContext{
CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}),
},
},
},
{
name: "timeout default",
snap: proxycfg.ConfigSnapshot{},
upstream: structs.TestUpstreams(t)[1],
want: &envoy.Cluster{
Name: "prepared_query:geo-cache",
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
ConnectTimeout: 5 * time.Second, // Default
OutlierDetection: &cluster.OutlierDetection{},
TlsContext: &envoyauth.UpstreamTlsContext{
CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
got, err := makeUpstreamCluster(tt.upstream, &tt.snap)
require.NoError(err)

require.Equal(tt.want, got)
})
}
}
33 changes: 30 additions & 3 deletions agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"errors"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/gogo/protobuf/proto"

"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)

// endpointsFromSnapshot returns the xDS API representation of the "endpoints"
Expand All @@ -19,9 +21,6 @@ func endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]pr
}
resources := make([]proto.Message, 0, len(cfgSnap.UpstreamEndpoints))
for id, endpoints := range cfgSnap.UpstreamEndpoints {
if len(endpoints) < 1 {
continue
}
la := makeLoadAssignment(id, endpoints)
resources = append(resources, la)
}
Expand All @@ -43,10 +42,38 @@ func makeLoadAssignment(clusterName string, endpoints structs.CheckServiceNodes)
if addr == "" {
addr = ep.Node.Address
}
healthStatus := envoycore.HealthStatus_HEALTHY
weight := 1
if ep.Service.Weights != nil {
weight = ep.Service.Weights.Passing
}

for _, chk := range ep.Checks {
if chk.Status == api.HealthCritical {
// This can't actually happen now because health always filters critical
// but in the future it may not so set this correctly!
healthStatus = envoycore.HealthStatus_UNHEALTHY
}
if chk.Status == api.HealthWarning && ep.Service.Weights != nil {
weight = ep.Service.Weights.Warning
}
}
// Make weights fit Envoy's limits. A zero weight means that either Warning
// (likely) or Passing (weirdly) weight has been set to 0 effectively making
// this instance unhealthy and should not be sent traffic.
if weight < 1 {
healthStatus = envoycore.HealthStatus_UNHEALTHY
weight = 1
}
if weight > 128 {
weight = 128
}
es = append(es, envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr(addr, ep.Service.Port),
},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),
})
}
return &envoy.ClusterLoadAssignment{
Expand Down
193 changes: 193 additions & 0 deletions agent/xds/endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package xds

import (
"testing"

"github.com/mitchellh/copystructure"

"github.com/stretchr/testify/require"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/hashicorp/consul/agent/structs"
)

func Test_makeLoadAssignment(t *testing.T) {

testCheckServiceNodes := structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "node1-id",
Node: "node1",
Address: "10.10.10.10",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: "web",
Port: 1234,
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node1",
CheckID: "serfHealth",
Status: "passing",
},
&structs.HealthCheck{
Node: "node1",
ServiceID: "web",
CheckID: "web:check",
Status: "passing",
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "node2-id",
Node: "node2",
Address: "10.10.10.20",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: "web",
Port: 1234,
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node2",
CheckID: "serfHealth",
Status: "passing",
},
&structs.HealthCheck{
Node: "node2",
ServiceID: "web",
CheckID: "web:check",
Status: "passing",
},
},
},
}

testWeightedCheckServiceNodesRaw, err := copystructure.Copy(testCheckServiceNodes)
require.NoError(t, err)
testWeightedCheckServiceNodes := testWeightedCheckServiceNodesRaw.(structs.CheckServiceNodes)

testWeightedCheckServiceNodes[0].Service.Weights = &structs.Weights{
Passing: 10,
Warning: 1,
}
testWeightedCheckServiceNodes[1].Service.Weights = &structs.Weights{
Passing: 5,
Warning: 0,
}

testWarningCheckServiceNodesRaw, err := copystructure.Copy(testWeightedCheckServiceNodes)
require.NoError(t, err)
testWarningCheckServiceNodes := testWarningCheckServiceNodesRaw.(structs.CheckServiceNodes)

testWarningCheckServiceNodes[0].Checks[0].Status = "warning"
testWarningCheckServiceNodes[1].Checks[0].Status = "warning"

tests := []struct {
name string
clusterName string
endpoints structs.CheckServiceNodes
want *envoy.ClusterLoadAssignment
}{
{
name: "no instances",
clusterName: "service:test",
endpoints: structs.CheckServiceNodes{},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{},
}},
},
},
{
name: "instances, no weights",
clusterName: "service:test",
endpoints: testCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
},
}},
},
},
{
name: "instances, healthy weights",
clusterName: "service:test",
endpoints: testWeightedCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(10),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(5),
},
},
}},
},
},
{
name: "instances, warning weights",
clusterName: "service:test",
endpoints: testWarningCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_UNHEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
},
}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := makeLoadAssignment(tt.clusterName, tt.endpoints)
require.Equal(t, tt.want, got)
})
}
}
Loading