Skip to content

Commit

Permalink
Merge pull request #2908 from davidfestal/tmc-e2e-sharding-step-1
Browse files Browse the repository at this point in the history
🌱 TMC E2E tests sharding support - step 1
  • Loading branch information
openshift-merge-robot authored Mar 29, 2023
2 parents b5e3a0e + d14686a commit ebf24bb
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 99 deletions.
31 changes: 21 additions & 10 deletions pkg/reconciler/workload/synctarget/synctarget_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,28 @@ func (c *Controller) reconcile(ctx context.Context, syncTarget *workloadv1alpha1

desiredURLs := map[string]workloadv1alpha1.VirtualWorkspace{}

var rootShardKey string
for _, workspaceShard := range workspaceShards {
if workspaceShard.Spec.ExternalURL != "" {
sharedExternalURL, err := url.Parse(workspaceShard.Spec.ExternalURL)
if workspaceShard.Spec.VirtualWorkspaceURL != "" {
shardVWURL, err := url.Parse(workspaceShard.Spec.VirtualWorkspaceURL)
if err != nil {
logger.Error(err, "failed to parse workspaceShard.Spec.ExternalURL")
logger.Error(err, "failed to parse workspaceShard.Spec.VirtualWorkspaceURL")
return nil, err
}

syncerVirtualWorkspaceURL := *sharedExternalURL
syncerVirtualWorkspaceURL := *shardVWURL
syncerVirtualWorkspaceURL.Path = path.Join(
sharedExternalURL.Path,
shardVWURL.Path,
virtualworkspacesoptions.DefaultRootPathPrefix,
syncerbuilder.SyncerVirtualWorkspaceName,
logicalcluster.From(syncTargetCopy).String(),
syncTargetCopy.Name,
string(syncTargetCopy.UID),
)

upsyncerVirtualWorkspaceURL := *sharedExternalURL
upsyncerVirtualWorkspaceURL := *shardVWURL
(&upsyncerVirtualWorkspaceURL).Path = path.Join(
sharedExternalURL.Path,
shardVWURL.Path,
virtualworkspacesoptions.DefaultRootPathPrefix,
syncerbuilder.UpsyncerVirtualWorkspaceName,
logicalcluster.From(syncTargetCopy).String(),
Expand All @@ -77,16 +78,26 @@ func (c *Controller) reconcile(ctx context.Context, syncTarget *workloadv1alpha1
syncerURL := (&syncerVirtualWorkspaceURL).String()
upsyncerURL := (&upsyncerVirtualWorkspaceURL).String()

desiredURLs[sharedExternalURL.String()] = workloadv1alpha1.VirtualWorkspace{
if workspaceShard.Name == corev1alpha1.RootShard {
rootShardKey = shardVWURL.String()
}
desiredURLs[shardVWURL.String()] = workloadv1alpha1.VirtualWorkspace{
SyncerURL: syncerURL,
UpsyncerURL: upsyncerURL,
}
}
}

// Let's always add the desired URL in the same order, which will be the order of the
// corresponding shard URLs
// Let's always add the desired URLs in the same order:
// - urls for the root shard will always be added at the first place,
// in order to ensure compatibility with the shard-unaware Syncer
// - urls for other shards which will be added in the lexical order of the
// corresponding shard URLs.
var desiredVirtualWorkspaces []workloadv1alpha1.VirtualWorkspace //nolint:prealloc
if rootShardVWURLs, ok := desiredURLs[rootShardKey]; ok {
desiredVirtualWorkspaces = append(desiredVirtualWorkspaces, rootShardVWURLs)
delete(desiredURLs, rootShardKey)
}
for _, shardURL := range sets.StringKeySet(desiredURLs).List() {
desiredVirtualWorkspaces = append(desiredVirtualWorkspaces, desiredURLs[shardURL])
}
Expand Down
112 changes: 95 additions & 17 deletions pkg/reconciler/workload/synctarget/synctarget_reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func TestReconciler(t *testing.T) {
Name: "root",
},
Spec: corev1alpha1.ShardSpec{
BaseURL: "http://1.2.3.4/",
ExternalURL: "http://external-host/",
BaseURL: "http://1.2.3.4/",
VirtualWorkspaceURL: "http://external-host/",
},
},
},
Expand Down Expand Up @@ -96,26 +96,26 @@ func TestReconciler(t *testing.T) {
Name: "root2",
},
Spec: corev1alpha1.ShardSpec{
BaseURL: "http://1.2.3.4/",
ExternalURL: "http://external-host-2/",
BaseURL: "http://1.2.3.4/",
VirtualWorkspaceURL: "http://external-host-2/",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "root3",
},
Spec: corev1alpha1.ShardSpec{
BaseURL: "http://1.2.3.4/",
ExternalURL: "http://external-host-3/",
BaseURL: "http://1.2.3.4/",
VirtualWorkspaceURL: "http://external-host-3/",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "root",
},
Spec: corev1alpha1.ShardSpec{
BaseURL: "http://1.2.3.4/",
ExternalURL: "http://external-host-1/",
BaseURL: "http://1.2.3.4/",
VirtualWorkspaceURL: "http://external-host-1/",
},
},
},
Expand Down Expand Up @@ -167,33 +167,111 @@ func TestReconciler(t *testing.T) {
},
expectError: false,
},
"SyncTarget with multiple Shards with duplicated ExternalURLs results in a deduplicated list of URLs on the SyncTarget": {
"SyncTarget and multiple Shards, but root shard always first": {
workspaceShards: []*corev1alpha1.Shard{
{
ObjectMeta: metav1.ObjectMeta{
Name: "root2",
},
Spec: corev1alpha1.ShardSpec{
BaseURL: "http://1.2.3.4/",
VirtualWorkspaceURL: "http://external-host-2/",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "root3",
},
Spec: corev1alpha1.ShardSpec{
BaseURL: "http://1.2.3.4/",
VirtualWorkspaceURL: "http://external-host-3/",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "root",
},
Spec: corev1alpha1.ShardSpec{
BaseURL: "http://1.2.3.4/",
VirtualWorkspaceURL: "http://external-host-10/",
},
},
},
syncTarget: &workloadv1alpha1.SyncTarget{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cluster",
Annotations: map[string]string{
logicalcluster.AnnotationKey: "demo:root:yourworkspace",
},
},
Spec: workloadv1alpha1.SyncTargetSpec{
Unschedulable: false,
EvictAfter: nil,
},
Status: workloadv1alpha1.SyncTargetStatus{
VirtualWorkspaces: []workloadv1alpha1.VirtualWorkspace{},
},
},
expectedSyncTarget: &workloadv1alpha1.SyncTarget{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cluster",
Annotations: map[string]string{
logicalcluster.AnnotationKey: "demo:root:yourworkspace",
},
Labels: map[string]string{
"internal.workload.kcp.io/key": "aPXkBdRsTD8gXESO47r9qXmkr2kaG5qaox5C8r",
},
},
Spec: workloadv1alpha1.SyncTargetSpec{
Unschedulable: false,
EvictAfter: nil,
},
Status: workloadv1alpha1.SyncTargetStatus{
VirtualWorkspaces: []workloadv1alpha1.VirtualWorkspace{
{
SyncerURL: "http://external-host-10/services/syncer/demo:root:yourworkspace/test-cluster",
UpsyncerURL: "http://external-host-10/services/upsyncer/demo:root:yourworkspace/test-cluster",
},
{
SyncerURL: "http://external-host-2/services/syncer/demo:root:yourworkspace/test-cluster",
UpsyncerURL: "http://external-host-2/services/upsyncer/demo:root:yourworkspace/test-cluster",
},
{
SyncerURL: "http://external-host-3/services/syncer/demo:root:yourworkspace/test-cluster",
UpsyncerURL: "http://external-host-3/services/upsyncer/demo:root:yourworkspace/test-cluster",
},
},
},
},
expectError: false,
},
"SyncTarget with multiple Shards with duplicated VirtualWorkspaceURLs results in a deduplicated list of URLs on the SyncTarget": {
workspaceShards: []*corev1alpha1.Shard{
{
ObjectMeta: metav1.ObjectMeta{
Name: "root",
},
Spec: corev1alpha1.ShardSpec{
BaseURL: "http://1.2.3.4/",
ExternalURL: "http://external-host-1/",
BaseURL: "http://1.2.3.4/",
VirtualWorkspaceURL: "http://external-host-1/",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "root2",
},
Spec: corev1alpha1.ShardSpec{
BaseURL: "http://1.2.3.4/",
ExternalURL: "http://external-host-1/",
BaseURL: "http://1.2.3.4/",
VirtualWorkspaceURL: "http://external-host-1/",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "root3",
},
Spec: corev1alpha1.ShardSpec{
BaseURL: "http://1.2.3.4/",
ExternalURL: "http://external-host-3/",
BaseURL: "http://1.2.3.4/",
VirtualWorkspaceURL: "http://external-host-3/",
},
},
},
Expand Down Expand Up @@ -282,8 +360,8 @@ func TestReconciler(t *testing.T) {
Name: "root",
},
Spec: corev1alpha1.ShardSpec{
BaseURL: "http://1.2.3.4/",
ExternalURL: "http://external-host-1/",
BaseURL: "http://1.2.3.4/",
VirtualWorkspaceURL: "http://external-host-1/",
},
},
},
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/framework/kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ type RunningServer interface {
BaseConfig(t *testing.T) *rest.Config
RootShardSystemMasterBaseConfig(t *testing.T) *rest.Config
ShardSystemMasterBaseConfig(t *testing.T, shard string) *rest.Config
ShardNames() []string
Artifact(t *testing.T, producer func() (runtime.Object, error))
ClientCAUserConfig(t *testing.T, config *rest.Config, name string, groups ...string) *rest.Config
CADirectory() string
Expand Down Expand Up @@ -853,6 +854,10 @@ func (c *kcpServer) ShardSystemMasterBaseConfig(t *testing.T, shard string) *res
return c.RootShardSystemMasterBaseConfig(t)
}

func (c *kcpServer) ShardNames() []string {
return []string{corev1alpha1.RootShard}
}

// RawConfig exposes a copy of the client config for this server.
func (c *kcpServer) RawConfig() (clientcmdapi.Config, error) {
c.lock.Lock()
Expand Down Expand Up @@ -1117,6 +1122,10 @@ func (s *unmanagedKCPServer) ShardSystemMasterBaseConfig(t *testing.T, shard str
return wrappedCfg
}

func (s *unmanagedKCPServer) ShardNames() []string {
return sets.StringKeySet(s.shardCfgs).List()
}

func (s *unmanagedKCPServer) Artifact(t *testing.T, producer func() (runtime.Object, error)) {
t.Helper()
artifact(t, s, producer)
Expand Down
58 changes: 23 additions & 35 deletions test/e2e/framework/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/kcp-dev/kcp/pkg/syncer"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
apiresourcev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apiresource/v1alpha1"
scheduling1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1"
conditionsv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/apis/conditions/v1alpha1"
"github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions"
Expand Down Expand Up @@ -259,6 +260,8 @@ func (sf *syncerFixture) CreateSyncTargetAndApplyToDownstream(t *testing.T) *app
kcpClusterClient, err := kcpclientset.NewForConfig(upstreamCfg)
require.NoError(t, err, "error creating upstream kcp client")

gather(upstreamClusterDynamic.Cluster(sf.syncTargetPath), workloadv1alpha1.SchemeGroupVersion.WithResource("synctargets"))
gather(upstreamClusterDynamic.Cluster(sf.syncTargetPath), scheduling1alpha1.SchemeGroupVersion.WithResource("locations"))
gather(upstreamClusterDynamic.Cluster(sf.syncTargetPath), apiresourcev1alpha1.SchemeGroupVersion.WithResource("apiresourceimports"))
gather(upstreamClusterDynamic.Cluster(sf.syncTargetPath), apiresourcev1alpha1.SchemeGroupVersion.WithResource("negotiatedapiresources"))
gather(upstreamClusterDynamic.Cluster(sf.syncTargetPath), corev1.SchemeGroupVersion.WithResource("namespaces"))
Expand Down Expand Up @@ -310,42 +313,27 @@ func (sf *syncerFixture) CreateSyncTargetAndApplyToDownstream(t *testing.T) *app
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

rawConfig, err := sf.upstreamServer.RawConfig()
require.NoError(t, err)

kcpClusterClient, err := kcpclientset.NewForConfig(syncerConfig.UpstreamConfig)
require.NoError(t, err)
var virtualWorkspaceURL string
var syncTargetClusterName logicalcluster.Name
Eventually(t, func() (success bool, reason string) {
syncTarget, err := kcpClusterClient.Cluster(syncerConfig.SyncTargetPath).WorkloadV1alpha1().SyncTargets().Get(ctx, syncerConfig.SyncTargetName, metav1.GetOptions{})
require.NoError(t, err)
if len(syncTarget.Status.VirtualWorkspaces) != 1 {
return false, ""
}
virtualWorkspaceURL = syncTarget.Status.VirtualWorkspaces[0].SyncerURL
syncTargetClusterName = logicalcluster.From(syncTarget)
return true, "Virtual workspace URL is available"
}, wait.ForeverTestTimeout, 100*time.Millisecond, "Syncer Virtual Workspace URL not available")

virtualWorkspaceRawConfig := rawConfig.DeepCopy()
virtualWorkspaceRawConfig.Clusters["syncer"] = rawConfig.Clusters["base"].DeepCopy()
virtualWorkspaceRawConfig.Clusters["syncer"].Server = virtualWorkspaceURL
virtualWorkspaceRawConfig.Contexts["syncer"] = rawConfig.Contexts["base"].DeepCopy()
virtualWorkspaceRawConfig.Contexts["syncer"].Cluster = "syncer"
virtualWorkspaceRawConfig.Clusters["upsyncer"] = rawConfig.Clusters["base"].DeepCopy()
virtualWorkspaceRawConfig.Clusters["upsyncer"].Server = strings.Replace(virtualWorkspaceURL, "/services/syncer/", "/services/upsyncer/", 1)
virtualWorkspaceRawConfig.Contexts["upsyncer"] = rawConfig.Contexts["base"].DeepCopy()
virtualWorkspaceRawConfig.Contexts["upsyncer"].Cluster = "upsyncer"
syncerVWConfig, err := clientcmd.NewNonInteractiveClientConfig(*virtualWorkspaceRawConfig, "syncer", nil, nil).ClientConfig()
require.NoError(t, err)
syncerVWConfig = rest.AddUserAgent(rest.CopyConfig(syncerVWConfig), t.Name())
require.NoError(t, err)
upsyncerVWConfig, err := clientcmd.NewNonInteractiveClientConfig(*virtualWorkspaceRawConfig, "upsyncer", nil, nil).ClientConfig()
require.NoError(t, err)
upsyncerVWConfig = rest.AddUserAgent(rest.CopyConfig(upsyncerVWConfig), t.Name())
syncTarget, err := kcpClusterClient.Cluster(syncerConfig.SyncTargetPath).WorkloadV1alpha1().SyncTargets().Get(ctx, syncerConfig.SyncTargetName, metav1.GetOptions{})
require.NoError(t, err)

syncTargetClusterName = logicalcluster.From(syncTarget)

getVWURLs := func(toURL func(workloadv1alpha1.VirtualWorkspace) string) func() []string {
return func() []string {
syncTarget, err := kcpClusterClient.Cluster(syncerConfig.SyncTargetPath).WorkloadV1alpha1().SyncTargets().Get(ctx, syncerConfig.SyncTargetName, metav1.GetOptions{})
require.NoError(t, err)

var urls []string
for _, vw := range syncTarget.Status.VirtualWorkspaces {
urls = append(urls, toURL(vw))
}
return urls
}
}

return &appliedSyncerFixture{
syncerFixture: *sf,

Expand All @@ -356,8 +344,8 @@ func (sf *syncerFixture) CreateSyncTargetAndApplyToDownstream(t *testing.T) *app
DownstreamKubeClient: downstreamKubeClient,
DownstreamKubeconfigPath: downstreamKubeconfigPath,

SyncerVirtualWorkspaceConfig: syncerVWConfig,
UpsyncerVirtualWorkspaceConfig: upsyncerVWConfig,
GetSyncerVirtualWorkspaceURLs: getVWURLs(func(vw workloadv1alpha1.VirtualWorkspace) string { return vw.SyncerURL }),
GetUpsyncerVirtualWorkspaceURLs: getVWURLs(func(vw workloadv1alpha1.VirtualWorkspace) string { return vw.UpsyncerURL }),
}
}

Expand Down Expand Up @@ -571,8 +559,8 @@ type appliedSyncerFixture struct {
DownstreamKubeClient kubernetesclient.Interface
DownstreamKubeconfigPath string

SyncerVirtualWorkspaceConfig *rest.Config
UpsyncerVirtualWorkspaceConfig *rest.Config
GetSyncerVirtualWorkspaceURLs func() []string
GetUpsyncerVirtualWorkspaceURLs func() []string

stopHeartBeat context.CancelFunc
stopSyncerTunnel context.CancelFunc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestDeploymentCoordinator(t *testing.T) {
}
}, wait.ForeverTestTimeout, time.Millisecond*100, "should create the deployment after the deployments resource is available in workspace %q", workspace.clusterName)

t.Logf("Wait for the workload in workspace %q to be started and available with 4 replicas", workspace.clusterName)
t.Logf("Wait for the workload in workspace %q to be started and available with %d replicas", workspace.clusterName, workspace.requestedReplicas)
func() {
defer dumpEventsAndPods(wkspDownstreamInfo)

Expand Down
Loading

0 comments on commit ebf24bb

Please sign in to comment.