Skip to content

Commit

Permalink
Add bind workload cmd
Browse files Browse the repository at this point in the history
Signed-off-by: Jian Qiu <jqiu@redhat.com>
  • Loading branch information
qiujian16 committed Oct 25, 2022
1 parent 2459721 commit 7ba6371
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 11 deletions.
27 changes: 27 additions & 0 deletions pkg/cliplugins/bind/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ var (
# Create an APIBinding named "my-binding" that binds to the APIExport "my-export" in the "root:my-service" workspace.
%[1]s bind apiexport root:my-service:my-export --name my-binding
`

bindWorkloadExampleUses = `
# Create a placement to deploy workload to synctargets in the workspace "root:compute".
%[1]s bind workload root:compute
`
)

func New(streams genericclioptions.IOStreams) *cobra.Command {
Expand Down Expand Up @@ -65,5 +70,27 @@ func New(streams genericclioptions.IOStreams) *cobra.Command {
bindOpts.BindFlags(bindCmd)

cmd.AddCommand(bindCmd)

bindWorkloadOpts := plugin.NewBindWorkloadOptions(streams)
bindWorkloadCmd := &cobra.Command{
Use: "workload <compute workspace>",
Short: "Bind to compute workspace",
Example: fmt.Sprintf(bindWorkloadExampleUses, "kubectl kcp"),
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if err := bindWorkloadOpts.Complete(args); err != nil {
return err
}

if err := bindWorkloadOpts.Validate(); err != nil {
return err
}

return bindWorkloadOpts.Run(cmd.Context())
},
}
bindWorkloadOpts.BindFlags(bindWorkloadCmd)

cmd.AddCommand(bindWorkloadCmd)
return cmd
}
295 changes: 295 additions & 0 deletions pkg/cliplugins/bind/plugin/bind_compute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
/*
Copyright 2022 The KCP Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package plugin

import (
"context"
"crypto/sha256"
"fmt"
"path"
"strings"

"github.com/kcp-dev/logicalcluster/v2"
"github.com/martinlindhe/base36"
"github.com/spf13/cobra"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/rest"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
schedulingv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/scheduling/v1alpha1"
kcpclient "github.com/kcp-dev/kcp/pkg/client/clientset/versioned"
"github.com/kcp-dev/kcp/pkg/cliplugins/base"
"github.com/kcp-dev/kcp/pkg/cliplugins/helpers"
)

type BindWorkloadOptions struct {
*base.Options

// APIExports is a list of APIExport to use in the workspace.
APIExports []string

// Namespace selector is a label selector to select namespace for the workload.
NamespaceSelector string

// LocationSelectors is a list of label selectors to select locations in the compute workspace.
LocationSelectors []string

// ComputeWorkspace is the workspace for synctarget
ComputeWorkspace logicalcluster.Name
}

func NewBindWorkloadOptions(streams genericclioptions.IOStreams) *BindWorkloadOptions {
return &BindWorkloadOptions{
Options: base.NewOptions(streams),
}
}

// BindFlags binds fields SyncOptions as command line flags to cmd's flagset.
func (o *BindWorkloadOptions) BindFlags(cmd *cobra.Command) {
o.Options.BindFlags(cmd)

cmd.Flags().StringSliceVar(&o.APIExports, "apiexports", o.APIExports,
"APIExport to bind to this workspace for workload, each APIExoport should be in the format of <absolute_ref_to_workspace>:<apiexport>")
cmd.Flags().StringVar(&o.NamespaceSelector, "namespace-selector", o.NamespaceSelector, "Label select to select namespaces to create workload.")
cmd.Flags().StringSliceVar(&o.LocationSelectors, "location-selectors", o.LocationSelectors,
"A list of label selectors to select locations in the compute workspace to sync workload.")
}

// Complete ensures all dynamically populated fields are initialized.
func (o *BindWorkloadOptions) Complete(args []string) error {
if err := o.Options.Complete(); err != nil {
return err
}

if len(args[0]) != 1 {
return fmt.Errorf("a compute workspace should be specified")
}

o.ComputeWorkspace = logicalcluster.New(args[0])

// if APIExport is not set use global kubernetes APIExpor and kubernetes APIExport in compute workspace
if len(o.APIExports) == 0 {
o.APIExports = []string{
"root:compute:kubernetes",
fmt.Sprintf("%s:kubernetes", o.ComputeWorkspace.String()),
}
}

// select all ns if namespace selector is not set
if len(o.NamespaceSelector) == 0 {
o.NamespaceSelector = labels.Everything().String()
}

// select all locations is location selectos is not set
if len(o.LocationSelectors) == 0 {
o.LocationSelectors = []string{labels.Everything().String()}
}

return nil
}

// Validate validates the BindOptions are complete and usable.
func (o *BindWorkloadOptions) Validate() error {
return nil
}

// Run create a placement in the workspace linkind to the compute workspace
func (o *BindWorkloadOptions) Run(ctx context.Context) error {
config, err := o.ClientConfig.ClientConfig()
if err != nil {
return err
}
userWorkspaceKcpClient, err := kcpclient.NewForConfig(config)
if err != nil {
return fmt.Errorf("failed to create kcp client: %w", err)
}

// build config to connect to compute workspace
computeWorkspaceConfig := rest.CopyConfig(config)
url, _, err := helpers.ParseClusterURL(config.Host)
if err != nil {
return err
}

url.Path = path.Join(url.Path, o.ComputeWorkspace.Path())
computeWorkspaceConfig.Host = url.String()
computeWorkspaceKcpClient, err := kcpclient.NewForConfig(computeWorkspaceConfig)
if err != nil {
return fmt.Errorf("failed to create kcp client: %w", err)
}

err = o.hasSupportedSyncTargets(ctx, computeWorkspaceKcpClient)
if err != nil {
return err
}

err = o.applyPlacement(ctx, userWorkspaceKcpClient)
if err != nil {
return err
}

err = o.applyAPIBinding(ctx, userWorkspaceKcpClient)
if err != nil {
return err
}

return nil
}

func apiBindingName(clusterName logicalcluster.Name, apiExportName string) string {
hash := sha256.Sum224([]byte(clusterName.Path()))
base36hash := strings.ToLower(base36.EncodeBytes(hash[:]))
return fmt.Sprintf("%s-%s", apiExportName, base36hash[:8])
}

func (o *BindWorkloadOptions) applyAPIBinding(ctx context.Context, client kcpclient.Interface) error {
apiBindings, err := client.ApisV1alpha1().APIBindings().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}

desiredAPIExports := sets.NewString(o.APIExports...)
existingAPIExports := sets.NewString()
for _, binding := range apiBindings.Items {
if binding.Spec.Reference.Workspace == nil {
continue
}
existingAPIExports.Insert(fmt.Sprintf("%s:%s", binding.Spec.Reference.Workspace.Path, binding.Spec.Reference.Workspace.ExportName))
}

diff := desiredAPIExports.Difference(existingAPIExports)
var errs []error
for export := range diff {
lclusterName, name := logicalcluster.New(export).Split()
apiBinding := &apisv1alpha1.APIBinding{
ObjectMeta: metav1.ObjectMeta{
Name: apiBindingName(lclusterName, name),
},
Spec: apisv1alpha1.APIBindingSpec{
Reference: apisv1alpha1.ExportReference{
Workspace: &apisv1alpha1.WorkspaceExportReference{
Path: lclusterName.String(),
ExportName: name,
},
},
},
}
_, err := client.ApisV1alpha1().APIBindings().Create(ctx, apiBinding, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
errs = append(errs, err)
}

_, err = fmt.Fprintf(o.Out, "apibinding %s for apiexport %s created.\n", apiBinding.Name, export)
if err != nil {
errs = append(errs, err)
}
}

return utilerrors.NewAggregate(errs)
}

// placement name is a hash of location selectors and ns selector, with location workspace name as the prefix
func (o *BindWorkloadOptions) placementName() string {
clusterName, name := o.ComputeWorkspace.Split()
hash := sha256.Sum224([]byte(o.NamespaceSelector + strings.Join(o.LocationSelectors, ",") + clusterName.Path()))
base36hash := strings.ToLower(base36.EncodeBytes(hash[:]))
return fmt.Sprintf("%s-%s", name, base36hash[:8])
}

func (o *BindWorkloadOptions) applyPlacement(ctx context.Context, client kcpclient.Interface) error {
nsSelector, err := metav1.ParseToLabelSelector(o.NamespaceSelector)
if err != nil {
return fmt.Errorf("namespace selector format not correct: %w", err)
}

var locationSelectors []metav1.LabelSelector
for _, locSelector := range o.LocationSelectors {
selector, err := metav1.ParseToLabelSelector(locSelector)
if err != nil {
return fmt.Errorf("location selector %s format not correct: %w", locSelector, err)
}
locationSelectors = append(locationSelectors, *selector)
}

placement := &schedulingv1alpha1.Placement{
ObjectMeta: metav1.ObjectMeta{
Name: o.placementName(),
},
Spec: schedulingv1alpha1.PlacementSpec{
NamespaceSelector: nsSelector,
LocationSelectors: locationSelectors,
LocationWorkspace: o.ComputeWorkspace.String(),
LocationResource: schedulingv1alpha1.GroupVersionResource{
Group: "workload.kcp.dev",
Version: "v1alpha1",
Resource: "synctargets",
},
},
}

_, err = client.SchedulingV1alpha1().Placements().Get(ctx, placement.Name, metav1.GetOptions{})
switch {
case errors.IsNotFound(err):
_, err := client.SchedulingV1alpha1().Placements().Create(ctx, placement, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
return err
}
case err != nil:
return err
}

_, err = fmt.Fprintf(o.Out, "placement %s created.\n", placement.Name)
return err
}

func (o *BindWorkloadOptions) hasSupportedSyncTargets(ctx context.Context, client kcpclient.Interface) error {
syncTargets, err := client.WorkloadV1alpha1().SyncTargets().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}

currentExports := sets.NewString(o.APIExports...)

supportedExports := sets.NewString()
for _, syncTarget := range syncTargets.Items {
for _, apiExport := range syncTarget.Spec.SupportedAPIExports {
if apiExport.Workspace == nil {
continue
}

path := apiExport.Workspace.Path
// if path is not set, the apiexport is in the compute workspace
if len(path) == 0 {
path = o.ComputeWorkspace.String()
}
supportedExports.Insert(fmt.Sprintf("%s:%s", path, apiExport.Workspace.ExportName))
}
}

diff := currentExports.Difference(supportedExports)
if diff.Len() > 0 {
return fmt.Errorf("not all apiexports is supported by the synctargets in workspace %s: %s", o.ComputeWorkspace, strings.Join(diff.List(), ","))
}

return nil
}
14 changes: 5 additions & 9 deletions pkg/cliplugins/workload/plugin/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"time"

jsonpatch "github.com/evanphx/json-patch"
kcpcache "github.com/kcp-dev/apimachinery/pkg/cache"
"github.com/kcp-dev/logicalcluster/v2"
"github.com/martinlindhe/base36"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -111,7 +110,7 @@ func NewSyncOptions(streams genericclioptions.IOStreams) *SyncOptions {
QPS: 20,
Burst: 30,
APIImportPollInterval: 1 * time.Minute,
APIExports: []string{"root:compute|kubernetes"},
APIExports: []string{"root:compute:kubernetes"},
}
}

Expand All @@ -121,7 +120,7 @@ func (o *SyncOptions) BindFlags(cmd *cobra.Command) {

cmd.Flags().StringSliceVar(&o.ResourcesToSync, "resources", o.ResourcesToSync, "Resources to synchronize with kcp.")
cmd.Flags().StringSliceVar(&o.APIExports, "apiexports", o.APIExports,
"APIExport to be supported by the syncer, each APIExoport should be in the format of {workspace}|{export name}, "+
"APIExport to be supported by the syncer, each APIExoport should be in the format of <absolute_ref_to_workspace>:<apiexport>, "+
"e.g. root:compute|kubernetes is the kubernetes APIExport in root:compute workspace")
cmd.Flags().StringVar(&o.SyncerImage, "syncer-image", o.SyncerImage, "The syncer image to use in the syncer's deployment YAML. Images are published at https://github.com/kcp-dev/kcp/pkgs/container/kcp%2Fsyncer.")
cmd.Flags().IntVar(&o.Replicas, "replicas", o.Replicas, "Number of replicas of the syncer deployment.")
Expand Down Expand Up @@ -267,14 +266,9 @@ func getSyncerID(syncTarget *workloadv1alpha1.SyncTarget) string {
}

func (o *SyncOptions) applySyncTarget(ctx context.Context, kcpClient kcpclient.Interface, syncTargetName string) (*workloadv1alpha1.SyncTarget, error) {
syncTarget, err := kcpClient.WorkloadV1alpha1().SyncTargets().Get(ctx, syncTargetName, metav1.GetOptions{})

supportedAPIExports := make([]apisv1alpha1.ExportReference, len(o.APIExports))
for _, export := range o.APIExports {
lclusterName, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(export)
if err != nil {
return nil, fmt.Errorf("export %s format is not correct: %w", export, err)
}
lclusterName, name := logicalcluster.New(export).Split()
supportedAPIExports = append(supportedAPIExports, apisv1alpha1.ExportReference{
Workspace: &apisv1alpha1.WorkspaceExportReference{
ExportName: name,
Expand All @@ -292,6 +286,8 @@ func (o *SyncOptions) applySyncTarget(ctx context.Context, kcpClient kcpclient.I
})
}

syncTarget, err := kcpClient.WorkloadV1alpha1().SyncTargets().Get(ctx, syncTargetName, metav1.GetOptions{})

switch {
case apierrors.IsNotFound(err):
// Create the sync target that will serve as a point of coordination between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestMultipleExports(t *testing.T) {
syncTargetName := fmt.Sprintf("synctarget-%d", +rand.Intn(1000000))
t.Logf("Creating a SyncTarget and syncer in %s", computeClusterName)
syncTarget := framework.NewSyncerFixture(t, source, computeClusterName,
framework.WithAPIExports(fmt.Sprintf("%s|%s", serviceSchemaClusterName.String(), serviceAPIExport.Name)),
framework.WithAPIExports(fmt.Sprintf("%s:%s", serviceSchemaClusterName.String(), serviceAPIExport.Name)),
framework.WithSyncTarget(computeClusterName, syncTargetName),
framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) {
if !isFakePCluster {
Expand Down
Loading

0 comments on commit 7ba6371

Please sign in to comment.