Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
Signed-off-by: Jian Qiu <jqiu@redhat.com>
  • Loading branch information
qiujian16 committed Mar 27, 2023
1 parent e54daad commit 7644e05
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 63 deletions.
20 changes: 10 additions & 10 deletions pkg/cliplugins/workload/plugin/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/validation"
Expand All @@ -60,6 +61,7 @@ import (
"github.com/kcp-dev/kcp/pkg/cliplugins/base"
"github.com/kcp-dev/kcp/pkg/cliplugins/helpers"
kcpfeatures "github.com/kcp-dev/kcp/pkg/features"
"github.com/kcp-dev/kcp/pkg/reconciler/workload/apiexport"
)

//go:embed *.yaml
Expand Down Expand Up @@ -715,22 +717,20 @@ func mergeLatestResourceSchema(apiExport *apisv1alpha1.APIExport, resourceToSync
desiredResourceGroup := sets.NewString()
var modified bool
for _, schema := range apiExport.Spec.LatestResourceSchemas {
comps := strings.SplitN(schema, ".", 3)
if len(comps) != 3 {
gr, valid := apiexport.ParseAPIResourceSchemaName(schema)
if !valid {
continue
}
desiredResourceGroup.Insert(fmt.Sprintf("%s.%s", comps[1], comps[2]))
desiredResourceGroup.Insert(gr.String())
}
for _, resource := range resourceToSync {
nameParts := strings.SplitN(resource, ".", 2)
resourceName := nameParts[0]
apiGroup := "core"
if len(nameParts) > 1 {
apiGroup = nameParts[1]
gr := schema.ParseGroupResource(resource)
if len(gr.Group) == 0 {
gr.Group = "core"
}
if !desiredResourceGroup.Has(fmt.Sprintf("%s.%s", resourceName, apiGroup)) {
if !desiredResourceGroup.Has(gr.String()) {
// the rev-0 here is a placeholder and will be replaced by rv of negotiated APIResourceSchema finally.
schemaName := fmt.Sprintf("rev-0.%s.%s", resourceName, apiGroup)
schemaName := fmt.Sprintf("rev-0.%s", gr.String())
apiExport.Spec.LatestResourceSchemas = append(apiExport.Spec.LatestResourceSchemas, schemaName)
modified = true
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/reconciler/workload/apiexport/apiresourceschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package apiexport

import (
"strings"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

apiresourcev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apiresource/v1alpha1"
apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
Expand Down Expand Up @@ -72,3 +75,12 @@ func toAPIResourceSchema(r *apiresourcev1alpha1.NegotiatedAPIResource, name stri

return schema
}

// ParseAPIResourceSchemaName parses name of APIResourceSchema to a gr and schema if it is valid.
func ParseAPIResourceSchemaName(name string) (schema.GroupResource, bool) {
comps := strings.SplitN(name, ".", 3)
if len(comps) < 3 {
return schema.GroupResource{}, false
}
return schema.GroupResource{Resource: comps[1], Group: comps[2]}, true
}
69 changes: 24 additions & 45 deletions pkg/reconciler/workload/apiexport/workload_apiexport_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"reflect"
"strings"
"time"

"github.com/kcp-dev/logicalcluster/v3"
Expand All @@ -29,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -78,35 +78,35 @@ func (r *schemaReconciler) reconcile(ctx context.Context, export *apisv1alpha1.A
return reconcileStatusStop, nil
}

resourcesByResourceGroup := map[string]*apiresourcev1alpha1.NegotiatedAPIResource{}
resourcesByResourceGroup := map[schema.GroupResource]*apiresourcev1alpha1.NegotiatedAPIResource{}
for _, r := range resources {
// TODO(sttts): what about multiple versions of the same resource? Something is missing in the apiresource APIs and controllers to support that.
resource, _, group, ok := split3(r.Name, ".")
if !ok {
continue
gr := schema.GroupResource{
Group: r.Spec.GroupVersion.Group,
Resource: r.Spec.Plural,
}
schemaName := fmt.Sprintf("%s.%s", resource, group)

resourcesByResourceGroup[schemaName] = r
if gr.Group == "" {
gr.Group = "core"
}
resourcesByResourceGroup[gr] = r
}

// reconcile schemas in export
// we check all schemas reference in the apiexport. if it is missing, we should create the schema
// if it is outdated, we should create the schema and delete the outdated schema.
upToDateResourceGroup := sets.NewString()
upToDateResourceGroups := sets.NewString()
expectedResourceGroups := sets.NewString()

// schemaNamesByResourceGroup records the upToData APIResourceSchemaName for the resourceGroup, and the
// schemaNamesByResourceGroup records the up-to-date APIResourceSchemaName for the resourceGroup, and the
// APIExport will be updated accordingly
schemaNamesByResourceGroup := map[string]string{}
schemaNamesByResourceGroup := map[schema.GroupResource]string{}
for _, schemaName := range export.Spec.LatestResourceSchemas {
_, resource, group, ok := split3(schemaName, ".")
gr, ok := ParseAPIResourceSchemaName(schemaName)
if !ok {
continue
}
resourceGroup := fmt.Sprintf("%s.%s", resource, group)
expectedResourceGroups.Insert(resourceGroup)
schemaNamesByResourceGroup[resourceGroup] = schemaName
expectedResourceGroups.Insert(gr.String())
schemaNamesByResourceGroup[gr] = schemaName

existingSchema, err := r.getAPIResourceSchema(ctx, clusterName, schemaName)
if apierrors.IsNotFound(err) {
Expand All @@ -117,7 +117,7 @@ func (r *schemaReconciler) reconcile(ctx context.Context, export *apisv1alpha1.A
}

// negotiated schema gone?
negotiated, ok := resourcesByResourceGroup[resourceGroup]
negotiated, ok := resourcesByResourceGroup[gr]
if !ok {
continue
}
Expand All @@ -126,16 +126,17 @@ func (r *schemaReconciler) reconcile(ctx context.Context, export *apisv1alpha1.A
newSchema := toAPIResourceSchema(negotiated, "")
if equality.Semantic.DeepEqual(&existingSchema.Spec, &newSchema.Spec) {
// nothing to do
upToDateResourceGroup.Insert(resourceGroup)
upToDateResourceGroups.Insert(gr.String())
}
}

// create missing or outdated schemas
outdatedOrMissing := expectedResourceGroups.Difference(upToDateResourceGroup)
outdatedOrMissing := expectedResourceGroups.Difference(upToDateResourceGroups)
outDatedSchemaNames := sets.NewString()
for _, resourceGroup := range outdatedOrMissing.List() {
logger.WithValues("schema", resourceGroup).V(2).Info("missing or outdated schema on APIExport, adding")
resource, ok := resourcesByResourceGroup[resourceGroup]
gr := schema.ParseGroupResource(resourceGroup)
resource, ok := resourcesByResourceGroup[gr]
if !ok {
// no negotiated schema, keep the current schema name.
continue
Expand All @@ -152,30 +153,22 @@ func (r *schemaReconciler) reconcile(ctx context.Context, export *apisv1alpha1.A
}
schema, err = r.createAPIResourceSchema(ctx, clusterName.Path(), schema)
if apierrors.IsAlreadyExists(err) {
// TODO(qiujian16) should update/recreate the existing schema?
schema, err = r.getAPIResourceSchema(ctx, clusterName, schemaName)
}
if err != nil {
return reconcileStatusStop, err
}

if _, ok := schemaNamesByResourceGroup[resourceGroup]; ok {
outDatedSchemaNames.Insert(schemaNamesByResourceGroup[resourceGroup])
if _, ok := schemaNamesByResourceGroup[gr]; ok {
outDatedSchemaNames.Insert(schemaNamesByResourceGroup[gr])
}
schemaNamesByResourceGroup[resourceGroup] = schema.Name
schemaNamesByResourceGroup[gr] = schema.Name
}

// update schema list in export
old := export.DeepCopy()
export.Spec.LatestResourceSchemas = []string{}
for _, resourceGroup := range expectedResourceGroups.List() {
schemaName, ok := schemaNamesByResourceGroup[resourceGroup]
if !ok {
// should not happen. We should have all schemas by now
logger.WithValues("schema", resourceGroup).Error(fmt.Errorf("schema for resource %q not found", resourceGroup), "unexpectedly missing schema for resource in APIExport")
return reconcileStatusStop, nil
}

for _, schemaName := range schemaNamesByResourceGroup {
export.Spec.LatestResourceSchemas = append(export.Spec.LatestResourceSchemas, schemaName)
}
if !reflect.DeepEqual(old.Spec.LatestResourceSchemas, export.Spec.LatestResourceSchemas) {
Expand All @@ -186,12 +179,6 @@ func (r *schemaReconciler) reconcile(ctx context.Context, export *apisv1alpha1.A

// delete schemas that are no longer needed
for schemaName := range outDatedSchemaNames {
if _, err := r.getAPIResourceSchema(ctx, clusterName, schemaName); err != nil {
if apierrors.IsNotFound(err) {
continue
}
return reconcileStatusStop, err
}
logger.V(2).Info("deleting schema of APIExport", "APIResourceSchema", schemaName)
if err := r.deleteAPIResourceSchema(ctx, clusterName.Path(), schemaName); err != nil && !apierrors.IsNotFound(err) {
return reconcileStatusStop, err
Expand All @@ -201,14 +188,6 @@ func (r *schemaReconciler) reconcile(ctx context.Context, export *apisv1alpha1.A
return reconcileStatusContinue, nil
}

func split3(s string, sep string) (string, string, string, bool) {
comps := strings.SplitN(s, sep, 3)
if len(comps) != 3 {
return "", "", "", false
}
return comps[0], comps[1], comps[2], true
}

func (c *controller) reconcile(ctx context.Context, export *apisv1alpha1.APIExport) error {
reconcilers := []reconciler{
&schemaReconciler{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func TestSchemaReconciler(t *testing.T) {
wantExportUpdates: map[string]ExportCheck{
workloadv1alpha1.ImportedAPISExportName: hasSchemas("rev-52.deployments.apps"),
},
wantSchemaDeletes: map[string]struct{}{"rev-0.deployments.apps": {}},
wantReconcileStatus: reconcileStatusContinue,
},
"non-triple schema name": {
Expand All @@ -252,6 +253,7 @@ func TestSchemaReconciler(t *testing.T) {
wantExportUpdates: map[string]ExportCheck{
workloadv1alpha1.ImportedAPISExportName: hasSchemas("rev-15.services.core"),
},
wantSchemaDeletes: map[string]struct{}{"rev-0.services.core": {}},
wantReconcileStatus: reconcileStatusContinue,
},
"dangling schema in export": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ func indexSyncTargetsByExports(obj interface{}) ([]string, error) {
}

clusterName := logicalcluster.From(synctarget)
if len(synctarget.Spec.SupportedAPIExports) == 0 {
return []string{clusterName.Path().Join(workloadv1alpha1.ImportedAPISExportName).String()}, nil
}

keys := make([]string, 0, len(synctarget.Spec.SupportedAPIExports))
for _, export := range synctarget.Spec.SupportedAPIExports {
if len(export.Path) == 0 {
Expand Down
12 changes: 12 additions & 0 deletions test/e2e/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/utils/pointer"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
Expand Down Expand Up @@ -90,6 +91,17 @@ func TestSyncerLifecycle(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

kcpClient, err := kcpclientset.NewForConfig(upstreamServer.BaseConfig(t))
require.NoError(t, err)
t.Logf("Waiting for imported-apis APIBinding to be ready...")
require.Eventually(t, func() bool {
binding, err := kcpClient.Cluster(wsPath).ApisV1alpha1().APIBindings().Get(ctx, workloadv1alpha1.ImportedAPISExportName, metav1.GetOptions{})
if err != nil {
return false
}
return binding.Status.Phase == apisv1alpha1.APIBindingPhaseBound
}, wait.ForeverTestTimeout, time.Millisecond*100, "imprted-apis APIBinding is not bound")

t.Logf("Bind location workspace")
framework.NewBindCompute(t, wsPath, upstreamServer).Bind(t)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ func IndexSyncTargetsByExports(obj interface{}) ([]string, error) {
syncTarget := obj.(*workloadv1alpha1.SyncTarget)

clusterName := logicalcluster.From(syncTarget)
if len(syncTarget.Spec.SupportedAPIExports) == 0 {
return []string{client.ToClusterAwareKey(clusterName.Path(), workloadv1alpha1.ImportedAPISExportName)}, nil
}

keys := make([]string, 0, len(syncTarget.Spec.SupportedAPIExports))
for _, export := range syncTarget.Spec.SupportedAPIExports {
path := export.Path
Expand Down

0 comments on commit 7644e05

Please sign in to comment.