diff --git a/pilot/pkg/controller/workloadentry/workloadentry_controller.go b/pilot/pkg/controller/workloadentry/workloadentry_controller.go index 23d2784975c1..768a70feecdc 100644 --- a/pilot/pkg/controller/workloadentry/workloadentry_controller.go +++ b/pilot/pkg/controller/workloadentry/workloadentry_controller.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cenkalti/backoff" + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" "github.com/gogo/protobuf/types" "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/api/errors" @@ -216,7 +217,7 @@ func setConnectMeta(c *config.Config, controller string, conTime time.Time) { delete(c.Annotations, DisconnectedAtAnnotation) } -func (c *Controller) RegisterWorkload(proxy *model.Proxy, conTime time.Time) error { +func (c *Controller) RegisterWorkload(proxy *model.Proxy, node *core.Node, conTime time.Time) error { if !features.WorkloadEntryAutoRegistration || c == nil { return nil } @@ -230,14 +231,14 @@ func (c *Controller) RegisterWorkload(proxy *model.Proxy, conTime time.Time) err c.adsConnections[makeProxyKey(proxy)]++ c.mutex.Unlock() - if err := c.registerWorkload(entryName, proxy, conTime); err != nil { + if err := c.registerWorkload(entryName, proxy, node, conTime); err != nil { log.Errorf(err) return err } return nil } -func (c *Controller) registerWorkload(entryName string, proxy *model.Proxy, conTime time.Time) error { +func (c *Controller) registerWorkload(entryName string, proxy *model.Proxy, node *core.Node, conTime time.Time) error { wle := c.store.Get(gvk.WorkloadEntry, entryName, proxy.Metadata.Namespace) if wle != nil { lastConTime, _ := time.Parse(timeFormat, wle.Annotations[ConnectedAtAnnotation]) @@ -263,7 +264,7 @@ func (c *Controller) registerWorkload(entryName string, proxy *model.Proxy, conT return fmt.Errorf("auto-registration WorkloadEntry of %v failed: cannot find WorkloadGroup %s/%s", proxy.ID, proxy.Metadata.Namespace, proxy.Metadata.AutoRegisterGroup) } - entry := workloadEntryFromGroup(entryName, proxy, groupCfg) + entry := workloadEntryFromGroup(entryName, proxy, node, groupCfg) setConnectMeta(entry, c.instanceID, conTime) _, err := c.store.Create(*entry) if err != nil { @@ -553,7 +554,7 @@ func mergeLabels(labels ...map[string]string) map[string]string { var workloadGroupIsController = true -func workloadEntryFromGroup(name string, proxy *model.Proxy, groupCfg *config.Config) *config.Config { +func workloadEntryFromGroup(name string, proxy *model.Proxy, node *core.Node, groupCfg *config.Config) *config.Config { group := groupCfg.Spec.(*v1alpha3.WorkloadGroup) entry := group.Template.DeepCopy() entry.Address = proxy.IPAddresses[0] @@ -574,8 +575,8 @@ func workloadEntryFromGroup(name string, proxy *model.Proxy, groupCfg *config.Co if proxy.Metadata.Network != "" { entry.Network = proxy.Metadata.Network } - if proxy.Locality != nil { - entry.Locality = util.LocalityToString(proxy.Locality) + if node.Locality != nil { + entry.Locality = util.LocalityToString(node.Locality) } if proxy.Metadata.ProxyConfig != nil && proxy.Metadata.ProxyConfig.ReadinessProbe != nil { annotations[status.WorkloadEntryHealthCheckAnnotation] = "true" diff --git a/pilot/pkg/controller/workloadentry/workloadentry_controller_test.go b/pilot/pkg/controller/workloadentry/workloadentry_controller_test.go index aae962cdba75..b3979f73eb29 100644 --- a/pilot/pkg/controller/workloadentry/workloadentry_controller_test.go +++ b/pilot/pkg/controller/workloadentry/workloadentry_controller_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" "github.com/google/go-cmp/cmp" "github.com/hashicorp/go-multierror" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,6 +32,7 @@ import ( "istio.io/istio/pilot/pkg/config/memory" "istio.io/istio/pilot/pkg/features" "istio.io/istio/pilot/pkg/model" + "istio.io/istio/pilot/pkg/networking/util" "istio.io/istio/pkg/config" "istio.io/istio/pkg/config/schema/collections" "istio.io/istio/pkg/config/schema/gvk" @@ -55,6 +57,8 @@ var ( Template: &v1alpha3.WorkloadEntry{ Ports: map[string]uint32{"http": 80}, Labels: map[string]string{"app": "a"}, + Network: "nw0", + Locality: "reg0/zone0/subzone0", Weight: 1, ServiceAccount: "sa-a", }, @@ -124,30 +128,32 @@ func TestAutoregistrationLifecycle(t *testing.T) { p2 := fakeProxy("1.2.3.4", wgA, "nw2") p3 := fakeProxy("1.2.3.5", wgA, "nw1") + n := fakeNode("reg1", "zone1", "subzone1") + // allows associating a Register call with Unregister var origConnTime time.Time t.Run("initial registration", func(t *testing.T) { // simply make sure the entry exists after connecting c1.RegisterWorkload(p, time.Now()) - checkEntryOrFail(t, store, wgA, p, c1.instanceID) + checkEntryOrFail(t, store, wgA, p, n, c1.instanceID) }) t.Run("multinetwork same ip", func(t *testing.T) { // make sure we don't overrwrite a similar entry for a different network c2.RegisterWorkload(p2, time.Now()) - checkEntryOrFail(t, store, wgA, p, c1.instanceID) - checkEntryOrFail(t, store, wgA, p2, c2.instanceID) + checkEntryOrFail(t, store, wgA, p, n, c1.instanceID) + checkEntryOrFail(t, store, wgA, p2, n, c2.instanceID) }) t.Run("fast reconnect", func(t *testing.T) { t.Run("same instance", func(t *testing.T) { // disconnect, make sure entry is still there with disconnect meta c1.QueueUnregisterWorkload(p, time.Now()) time.Sleep(features.WorkloadEntryCleanupGracePeriod / 2) - checkEntryOrFail(t, store, wgA, p, "") + checkEntryOrFail(t, store, wgA, p, n, "") // reconnect, ensure entry is there with the same instance id origConnTime = time.Now() c1.RegisterWorkload(p, origConnTime) - checkEntryOrFail(t, store, wgA, p, c1.instanceID) + checkEntryOrFail(t, store, wgA, p, n, c1.instanceID) }) t.Run("same instance: connect before disconnect ", func(t *testing.T) { // reconnect, ensure entry is there with the same instance id @@ -156,17 +162,17 @@ func TestAutoregistrationLifecycle(t *testing.T) { // make sure entry is still there with disconnect meta c1.QueueUnregisterWorkload(p, origConnTime) time.Sleep(features.WorkloadEntryCleanupGracePeriod / 2) - checkEntryOrFail(t, store, wgA, p, c1.instanceID) + checkEntryOrFail(t, store, wgA, p, n, c1.instanceID) }) t.Run("different instance", func(t *testing.T) { // disconnect, make sure entry is still there with disconnect metadata c1.QueueUnregisterWorkload(p, time.Now()) time.Sleep(features.WorkloadEntryCleanupGracePeriod / 2) - checkEntryOrFail(t, store, wgA, p, "") + checkEntryOrFail(t, store, wgA, p, n, "") // reconnect, ensure entry is there with the new instance id origConnTime = time.Now() c2.RegisterWorkload(p, origConnTime) - checkEntryOrFail(t, store, wgA, p, c2.instanceID) + checkEntryOrFail(t, store, wgA, p, n, c2.instanceID) }) }) t.Run("slow reconnect", func(t *testing.T) { @@ -178,7 +184,7 @@ func TestAutoregistrationLifecycle(t *testing.T) { // reconnect origConnTime = time.Now() c1.RegisterWorkload(p, origConnTime) - checkEntryOrFail(t, store, wgA, p, c1.instanceID) + checkEntryOrFail(t, store, wgA, p, n, c1.instanceID) }) t.Run("garbage collected if pilot stops after disconnect", func(t *testing.T) { // disconnect, kill the cleanup queue from the first controller @@ -251,11 +257,14 @@ func TestWorkloadEntryFromGroup(t *testing.T) { Ports: map[string]uint32{"http": 80}, Labels: map[string]string{"app": "a"}, Weight: 1, + Network: "nw0", + Locality: "rgn1/zone1/subzone1", ServiceAccount: "sa-a", }, }, } proxy := fakeProxy("10.0.0.1", group, "nw1") + node := fakeNode("rgn2", "zone2", "subzone2") wantLabels := map[string]string{ "app": "a", // from WorkloadEntry template @@ -288,12 +297,13 @@ func TestWorkloadEntryFromGroup(t *testing.T) { }, Labels: wantLabels, Network: "nw1", + Locality: "rgn2/zone2/subzone2", Weight: 1, ServiceAccount: "sa-a", }, } - got := workloadEntryFromGroup("test-we", proxy, &group) + got := workloadEntryFromGroup("test-we", proxy, node, &group) if diff := cmp.Diff(got, &want); diff != "" { t.Errorf(diff) } @@ -324,6 +334,7 @@ func checkEntry( store model.ConfigStoreCache, wg config.Config, proxy *model.Proxy, + node *core.Node, connectedTo string, ) (err error) { name := wg.Name + "-" + proxy.IPAddresses[0] @@ -357,6 +368,14 @@ func checkEntry( } } + loc := tmpl.Template.Locality + if node.Locality != nil { + loc = util.LocalityToString(node.Locality) + } + if we.Locality != loc { + err = multierror.Append(fmt.Errorf("entry has locality %s; expected %s", we.Locality, loc)) + } + // check controller annotations if connectedTo != "" { if v := cfg.Annotations[WorkloadControllerAnnotation]; v != connectedTo { @@ -395,9 +414,10 @@ func checkEntryOrFail( store model.ConfigStoreCache, wg config.Config, proxy *model.Proxy, + node *core.Node, connectedTo string, ) { - if err := checkEntry(store, wg, proxy, connectedTo); err != nil { + if err := checkEntry(store, wg, proxy, node, connectedTo); err != nil { t.Fatal(err) } } @@ -460,6 +480,16 @@ func fakeProxy(ip string, wg config.Config, nw string) *model.Proxy { } } +func fakeNode(r, z, sz string) *core.Node { + return &core.Node{ + Locality: &core.Locality{ + Region: r, + Zone: z, + SubZone: sz, + }, + } +} + // createOrFail wraps config creation with convience for failing tests func createOrFail(t test.Failer, store model.ConfigStoreCache, cfg config.Config) { if _, err := store.Create(cfg); err != nil { diff --git a/pilot/pkg/xds/ads.go b/pilot/pkg/xds/ads.go index e65db8a01ac1..7385132d409a 100644 --- a/pilot/pkg/xds/ads.go +++ b/pilot/pkg/xds/ads.go @@ -551,7 +551,7 @@ func (s *DiscoveryServer) initProxyState(node *core.Node, con *Connection) error proxy := con.proxy // this should be done before we look for service instances, but after we load metadata // TODO fix check in kubecontroller treat echo VMs like there isn't a pod - if err := s.WorkloadEntryController.RegisterWorkload(proxy, con.Connect); err != nil { + if err := s.WorkloadEntryController.RegisterWorkload(proxy, node, con.Connect); err != nil { return err } s.setProxyState(proxy, s.globalPushContext())