Skip to content

Commit

Permalink
autoregistration: allow bootstrap node to override locality
Browse files Browse the repository at this point in the history
During testing of VM autoregistration, we discovered that it is not
possible to override the WorkloadGroup template's locality from the
bootstrap node locality. The core issue is that istiod initializes the
proxy struct from the bootstrap node before passing it to the
autoregistration controller; however, the node's locality is not set on
the proxy at that time. This is fixed by passing the bootstrap node
along to autoregistration code in addition to the proxy.

Change-Id: Id9b1d2f0cc5d2cf9416670c26b0a490d302ea039
Reviewed-on: https://gerrit.musta.ch/c/public/istio/+/1391
Reviewed-by: Weibo He <weibo.he@airbnb.com>
Reviewed-by: Jungho Ahn <jungho.ahn@airbnb.com>
  • Loading branch information
S-Chan committed Jun 14, 2021
1 parent c267726 commit f2d9fe4
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 19 deletions.
15 changes: 8 additions & 7 deletions pilot/pkg/controller/workloadentry/workloadentry_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/cenkalti/backoff"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/gogo/protobuf/types"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -216,7 +217,7 @@ func setConnectMeta(c *config.Config, controller string, conTime time.Time) {
delete(c.Annotations, DisconnectedAtAnnotation)
}

func (c *Controller) RegisterWorkload(proxy *model.Proxy, conTime time.Time) error {
func (c *Controller) RegisterWorkload(proxy *model.Proxy, node *core.Node, conTime time.Time) error {
if !features.WorkloadEntryAutoRegistration || c == nil {
return nil
}
Expand All @@ -230,14 +231,14 @@ func (c *Controller) RegisterWorkload(proxy *model.Proxy, conTime time.Time) err
c.adsConnections[makeProxyKey(proxy)]++
c.mutex.Unlock()

if err := c.registerWorkload(entryName, proxy, conTime); err != nil {
if err := c.registerWorkload(entryName, proxy, node, conTime); err != nil {
log.Errorf(err)
return err
}
return nil
}

func (c *Controller) registerWorkload(entryName string, proxy *model.Proxy, conTime time.Time) error {
func (c *Controller) registerWorkload(entryName string, proxy *model.Proxy, node *core.Node, conTime time.Time) error {
wle := c.store.Get(gvk.WorkloadEntry, entryName, proxy.Metadata.Namespace)
if wle != nil {
lastConTime, _ := time.Parse(timeFormat, wle.Annotations[ConnectedAtAnnotation])
Expand All @@ -263,7 +264,7 @@ func (c *Controller) registerWorkload(entryName string, proxy *model.Proxy, conT
return fmt.Errorf("auto-registration WorkloadEntry of %v failed: cannot find WorkloadGroup %s/%s",
proxy.ID, proxy.Metadata.Namespace, proxy.Metadata.AutoRegisterGroup)
}
entry := workloadEntryFromGroup(entryName, proxy, groupCfg)
entry := workloadEntryFromGroup(entryName, proxy, node, groupCfg)
setConnectMeta(entry, c.instanceID, conTime)
_, err := c.store.Create(*entry)
if err != nil {
Expand Down Expand Up @@ -553,7 +554,7 @@ func mergeLabels(labels ...map[string]string) map[string]string {

var workloadGroupIsController = true

func workloadEntryFromGroup(name string, proxy *model.Proxy, groupCfg *config.Config) *config.Config {
func workloadEntryFromGroup(name string, proxy *model.Proxy, node *core.Node, groupCfg *config.Config) *config.Config {
group := groupCfg.Spec.(*v1alpha3.WorkloadGroup)
entry := group.Template.DeepCopy()
entry.Address = proxy.IPAddresses[0]
Expand All @@ -574,8 +575,8 @@ func workloadEntryFromGroup(name string, proxy *model.Proxy, groupCfg *config.Co
if proxy.Metadata.Network != "" {
entry.Network = proxy.Metadata.Network
}
if proxy.Locality != nil {
entry.Locality = util.LocalityToString(proxy.Locality)
if node.Locality != nil {
entry.Locality = util.LocalityToString(node.Locality)
}
if proxy.Metadata.ProxyConfig != nil && proxy.Metadata.ProxyConfig.ReadinessProbe != nil {
annotations[status.WorkloadEntryHealthCheckAnnotation] = "true"
Expand Down
52 changes: 41 additions & 11 deletions pilot/pkg/controller/workloadentry/workloadentry_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/go-multierror"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -31,6 +32,7 @@ import (
"istio.io/istio/pilot/pkg/config/memory"
"istio.io/istio/pilot/pkg/features"
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pilot/pkg/networking/util"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/collections"
"istio.io/istio/pkg/config/schema/gvk"
Expand All @@ -55,6 +57,8 @@ var (
Template: &v1alpha3.WorkloadEntry{
Ports: map[string]uint32{"http": 80},
Labels: map[string]string{"app": "a"},
Network: "nw0",
Locality: "reg0/zone0/subzone0",
Weight: 1,
ServiceAccount: "sa-a",
},
Expand Down Expand Up @@ -124,30 +128,32 @@ func TestAutoregistrationLifecycle(t *testing.T) {
p2 := fakeProxy("1.2.3.4", wgA, "nw2")
p3 := fakeProxy("1.2.3.5", wgA, "nw1")

n := fakeNode("reg1", "zone1", "subzone1")

// allows associating a Register call with Unregister
var origConnTime time.Time

t.Run("initial registration", func(t *testing.T) {
// simply make sure the entry exists after connecting
c1.RegisterWorkload(p, time.Now())
checkEntryOrFail(t, store, wgA, p, c1.instanceID)
checkEntryOrFail(t, store, wgA, p, n, c1.instanceID)
})
t.Run("multinetwork same ip", func(t *testing.T) {
// make sure we don't overrwrite a similar entry for a different network
c2.RegisterWorkload(p2, time.Now())
checkEntryOrFail(t, store, wgA, p, c1.instanceID)
checkEntryOrFail(t, store, wgA, p2, c2.instanceID)
checkEntryOrFail(t, store, wgA, p, n, c1.instanceID)
checkEntryOrFail(t, store, wgA, p2, n, c2.instanceID)
})
t.Run("fast reconnect", func(t *testing.T) {
t.Run("same instance", func(t *testing.T) {
// disconnect, make sure entry is still there with disconnect meta
c1.QueueUnregisterWorkload(p, time.Now())
time.Sleep(features.WorkloadEntryCleanupGracePeriod / 2)
checkEntryOrFail(t, store, wgA, p, "")
checkEntryOrFail(t, store, wgA, p, n, "")
// reconnect, ensure entry is there with the same instance id
origConnTime = time.Now()
c1.RegisterWorkload(p, origConnTime)
checkEntryOrFail(t, store, wgA, p, c1.instanceID)
checkEntryOrFail(t, store, wgA, p, n, c1.instanceID)
})
t.Run("same instance: connect before disconnect ", func(t *testing.T) {
// reconnect, ensure entry is there with the same instance id
Expand All @@ -156,17 +162,17 @@ func TestAutoregistrationLifecycle(t *testing.T) {
// make sure entry is still there with disconnect meta
c1.QueueUnregisterWorkload(p, origConnTime)
time.Sleep(features.WorkloadEntryCleanupGracePeriod / 2)
checkEntryOrFail(t, store, wgA, p, c1.instanceID)
checkEntryOrFail(t, store, wgA, p, n, c1.instanceID)
})
t.Run("different instance", func(t *testing.T) {
// disconnect, make sure entry is still there with disconnect metadata
c1.QueueUnregisterWorkload(p, time.Now())
time.Sleep(features.WorkloadEntryCleanupGracePeriod / 2)
checkEntryOrFail(t, store, wgA, p, "")
checkEntryOrFail(t, store, wgA, p, n, "")
// reconnect, ensure entry is there with the new instance id
origConnTime = time.Now()
c2.RegisterWorkload(p, origConnTime)
checkEntryOrFail(t, store, wgA, p, c2.instanceID)
checkEntryOrFail(t, store, wgA, p, n, c2.instanceID)
})
})
t.Run("slow reconnect", func(t *testing.T) {
Expand All @@ -178,7 +184,7 @@ func TestAutoregistrationLifecycle(t *testing.T) {
// reconnect
origConnTime = time.Now()
c1.RegisterWorkload(p, origConnTime)
checkEntryOrFail(t, store, wgA, p, c1.instanceID)
checkEntryOrFail(t, store, wgA, p, n, c1.instanceID)
})
t.Run("garbage collected if pilot stops after disconnect", func(t *testing.T) {
// disconnect, kill the cleanup queue from the first controller
Expand Down Expand Up @@ -251,11 +257,14 @@ func TestWorkloadEntryFromGroup(t *testing.T) {
Ports: map[string]uint32{"http": 80},
Labels: map[string]string{"app": "a"},
Weight: 1,
Network: "nw0",
Locality: "rgn1/zone1/subzone1",
ServiceAccount: "sa-a",
},
},
}
proxy := fakeProxy("10.0.0.1", group, "nw1")
node := fakeNode("rgn2", "zone2", "subzone2")

wantLabels := map[string]string{
"app": "a", // from WorkloadEntry template
Expand Down Expand Up @@ -288,12 +297,13 @@ func TestWorkloadEntryFromGroup(t *testing.T) {
},
Labels: wantLabels,
Network: "nw1",
Locality: "rgn2/zone2/subzone2",
Weight: 1,
ServiceAccount: "sa-a",
},
}

got := workloadEntryFromGroup("test-we", proxy, &group)
got := workloadEntryFromGroup("test-we", proxy, node, &group)
if diff := cmp.Diff(got, &want); diff != "" {
t.Errorf(diff)
}
Expand Down Expand Up @@ -324,6 +334,7 @@ func checkEntry(
store model.ConfigStoreCache,
wg config.Config,
proxy *model.Proxy,
node *core.Node,
connectedTo string,
) (err error) {
name := wg.Name + "-" + proxy.IPAddresses[0]
Expand Down Expand Up @@ -357,6 +368,14 @@ func checkEntry(
}
}

loc := tmpl.Template.Locality
if node.Locality != nil {
loc = util.LocalityToString(node.Locality)
}
if we.Locality != loc {
err = multierror.Append(fmt.Errorf("entry has locality %s; expected %s", we.Locality, loc))
}

// check controller annotations
if connectedTo != "" {
if v := cfg.Annotations[WorkloadControllerAnnotation]; v != connectedTo {
Expand Down Expand Up @@ -395,9 +414,10 @@ func checkEntryOrFail(
store model.ConfigStoreCache,
wg config.Config,
proxy *model.Proxy,
node *core.Node,
connectedTo string,
) {
if err := checkEntry(store, wg, proxy, connectedTo); err != nil {
if err := checkEntry(store, wg, proxy, node, connectedTo); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -460,6 +480,16 @@ func fakeProxy(ip string, wg config.Config, nw string) *model.Proxy {
}
}

func fakeNode(r, z, sz string) *core.Node {
return &core.Node{
Locality: &core.Locality{
Region: r,
Zone: z,
SubZone: sz,
},
}
}

// createOrFail wraps config creation with convience for failing tests
func createOrFail(t test.Failer, store model.ConfigStoreCache, cfg config.Config) {
if _, err := store.Create(cfg); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/xds/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (s *DiscoveryServer) initProxyState(node *core.Node, con *Connection) error
proxy := con.proxy
// this should be done before we look for service instances, but after we load metadata
// TODO fix check in kubecontroller treat echo VMs like there isn't a pod
if err := s.WorkloadEntryController.RegisterWorkload(proxy, con.Connect); err != nil {
if err := s.WorkloadEntryController.RegisterWorkload(proxy, node, con.Connect); err != nil {
return err
}
s.setProxyState(proxy, s.globalPushContext())
Expand Down

0 comments on commit f2d9fe4

Please sign in to comment.