Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replicate logicalclusters for workspacetypes #16

Open
wants to merge 33 commits into
base: sttts-random-workspace-scheduling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
971a4dc
reconciler/workspace: schedule to random shard
sttts Jan 11, 2023
d146657
workloads: schedule root:compute to root to make bootstrapping work
sttts Jan 11, 2023
91aaa80
e2e/authorizer: remove misplaced t.Helper() call
sttts Jan 11, 2023
3bd6115
apis/tenancy: fix json tag for workspace.spec.location
sttts Jan 11, 2023
98d9f46
e2e/reconciler: rename clusterworkspaceshards -> shards
sttts Jan 12, 2023
da1a730
Makefile: add SHARDS parameter defaulting to 2
sttts Jan 12, 2023
d39eac6
reconciler/cache/reconciler: ignore system:* clusters
sttts Jan 12, 2023
7fce4f9
cache: replicate synctargets and placements
sttts Jan 12, 2023
6414714
e2e: unify how APIBindings are created
sttts Jan 13, 2023
e497571
reconciler/apis/replication: support * in ClusterRules
sttts Jan 13, 2023
784cd26
reconciler/apis/replication: consider ClusterRoles from system:admin
sttts Jan 13, 2023
d920a03
e2e: print workspace shards
sttts Jan 13, 2023
419a5a9
reconciler/apis/replication: some more logging
sttts Jan 13, 2023
bf8de65
admission/apibinding: wire cached informer
sttts Jan 13, 2023
cfd3907
reconciler/apis/replication: unification and plumbing cleanup
sttts Jan 13, 2023
f209dbd
reconciler/cache: generalize label controllers
sttts Jan 13, 2023
74f7675
reconciler/tenancy: add replicate controller for workspacetypes
sttts Jan 13, 2023
2822ae8
cache: replicate admission webhooks
sttts Jan 13, 2023
93d27be
admission/webhooks: wire global webhook configurations
sttts Jan 13, 2023
25db5a0
apis/tenancy/workspaces: add region label column
sttts Jan 17, 2023
93903d1
Makefile correctly pass SHARDS variable to test-e2e-sharded-minimal t…
p0lyn0mial Jan 13, 2023
5044670
e2e: kubeconfig for any shard
sttts Jan 23, 2023
e5861b4
e2e/apibindings: count * lists across all shards
sttts Jan 23, 2023
4e2c77d
Fix e2e compliance TestValidatingWebhooInWorkspace
fgiloux Jan 19, 2023
b928dd7
cache: replicate LogicalClusters for APIExport workspace and relevant…
sttts Jan 24, 2023
4ed2bf8
cache: wire cache client and cache informers into virtual workspaces
sttts Jan 24, 2023
9c88e9d
WIP: e2e/apibindings: fix virtual workspace testing
sttts Jan 24, 2023
f934427
reconciler/workspacetype_controller: assing shard.spec.VirtualWorkspa…
p0lyn0mial Jan 24, 2023
5730ea5
reconciler/tenancy/workspacetype: rename clusterworkspacetype to work…
p0lyn0mial Jan 24, 2023
fc7c5ce
reconciler/replicateclusterrole: replicate ClusterRoles for workspace…
p0lyn0mial Jan 24, 2023
222bbd5
reconciler/tenancy: replicate LogicalClusters for WorkspaceType
p0lyn0mial Jan 24, 2023
5c9044d
reconciler/api/replicatelogicalcluster: rename replicateclusterrole t…
p0lyn0mial Jan 24, 2023
95f172b
reconciler/cache/labellogicalcluster: fix comments and log messages
p0lyn0mial Jan 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ endif
test-e2e-sharded: TEST_ARGS ?=
test-e2e-sharded: WHAT ?= ./test/e2e...
test-e2e-sharded: WORK_DIR ?= .
test-e2e-sharded: SHARDS ?= 2
ifdef ARTIFACT_DIR
test-e2e-sharded: LOG_DIR ?= $(ARTIFACT_DIR)/kcp
else
Expand All @@ -316,12 +317,13 @@ test-e2e-sharded: require-kind build-all build-kind-images
kind get kubeconfig > "$(WORK_DIR)/.kcp/kind.kubeconfig"
rm -f "$(WORK_DIR)/.kcp/admin.kubeconfig"
UNSAFE_E2E_HACK_DISABLE_ETCD_FSYNC=true NO_GORUN=1 \
./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=2 2>&1 & PID=$$!; echo "PID $$PID" && \
./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=$(SHARDS) 2>&1 & PID=$$!; echo "PID $$PID" && \
trap 'kill -TERM $$PID' TERM INT EXIT && \
while [ ! -f "$(WORK_DIR)/.kcp/admin.kubeconfig" ]; do sleep 1; done && \
NO_GORUN=1 GOOS=$(OS) GOARCH=$(ARCH) \
$(GO_TEST) -race $(COUNT_ARG) $(PARALLELISM_ARG) $(WHAT) $(TEST_ARGS) \
-args --use-default-kcp-server --root-shard-kubeconfig=$(PWD)/.kcp-0/admin.kubeconfig $(SUITES_ARG) \
-args --use-default-kcp-server --shard-kubeconfigs=root=$(PWD)/.kcp-0/admin.kubeconfig$(shell if [ $(SHARDS) -gt 1 ]; then seq 1 $$[$(SHARDS) - 1]; fi | while read n; do echo -n ",shard-$$n=$(PWD)/.kcp-$$n/admin.kubeconfig"; done) \
$(SUITES_ARG) \
--syncer-image="$(SYNCER_IMAGE)" --kcp-test-image="$(TEST_IMAGE)" --pcluster-kubeconfig="$(abspath $(WORK_DIR)/.kcp/kind.kubeconfig)" \
$(if $(value WAIT),|| { echo "Terminated with $$?"; wait "$$PID"; },)

Expand All @@ -332,6 +334,7 @@ endif
test-e2e-sharded-minimal: TEST_ARGS ?=
test-e2e-sharded-minimal: WHAT ?= ./test/e2e...
test-e2e-sharded-minimal: WORK_DIR ?= .
test-e2e-sharded-minimal: SHARDS ?= 2
ifdef ARTIFACT_DIR
test-e2e-sharded-minimal: LOG_DIR ?= $(ARTIFACT_DIR)/kcp
else
Expand All @@ -340,11 +343,12 @@ endif
test-e2e-sharded-minimal: build-all
mkdir -p "$(LOG_DIR)" "$(WORK_DIR)/.kcp"
rm -f "$(WORK_DIR)/.kcp/admin.kubeconfig"
UNSAFE_E2E_HACK_DISABLE_ETCD_FSYNC=true NO_GORUN=1 ./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=2 2>&1 & PID=$$!; echo "PID $$PID" && \
UNSAFE_E2E_HACK_DISABLE_ETCD_FSYNC=true NO_GORUN=1 ./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=$(SHARDS) 2>&1 & PID=$$!; echo "PID $$PID" && \
trap 'kill -TERM $$PID' TERM INT EXIT && \
while [ ! -f "$(WORK_DIR)/.kcp/admin.kubeconfig" ]; do sleep 1; done && \
NO_GORUN=1 GOOS=$(OS) GOARCH=$(ARCH) $(GO_TEST) -race $(COUNT_ARG) $(PARALLELISM_ARG) $(WHAT) $(TEST_ARGS) \
-args --use-default-kcp-server --root-shard-kubeconfig=$(PWD)/.kcp-0/admin.kubeconfig $(SUITES_ARGS) \
-args --use-default-kcp-server --shard-kubeconfigs=root=$(PWD)/.kcp-0/admin.kubeconfig$(shell if [ $(SHARDS) -gt 1 ]; then seq 1 $$[$(SHARDS) - 1]; fi | while read n; do echo -n ",shard-$$n=$(PWD)/.kcp-$$n/admin.kubeconfig"; done) \
$(SUITES_ARGS) \
$(if $(value WAIT),|| { echo "Terminated with $$?"; wait "$$PID"; },)

.PHONY: test
Expand Down
3 changes: 2 additions & 1 deletion cmd/apigen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/apis/admissionregistration"
"sigs.k8s.io/yaml"

"github.com/kcp-dev/kcp/pkg/apis/apis"
Expand Down Expand Up @@ -172,7 +173,7 @@ func loadCustomResourceDefinitions(logger logr.Logger, baseDir string) (map[meta
Group: parts[0],
Resource: parts[1],
}
if gr.Group == apis.GroupName || gr.Group == rbacv1.GroupName {
if gr.Group == apis.GroupName || gr.Group == rbacv1.GroupName || gr.Group == admissionregistration.GroupName {
logger.Info(fmt.Sprintf("Skipping CustomResourceDefinition %s from %s", gr.String(), path))
return nil
}
Expand Down
85 changes: 84 additions & 1 deletion cmd/sharded-test-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ import (
"path/filepath"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
machineryutilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
kuser "k8s.io/apiserver/pkg/authentication/user"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"

"github.com/kcp-dev/kcp/cmd/sharded-test-server/third_party/library-go/crypto"
shard "github.com/kcp-dev/kcp/cmd/test-server/kcp"
"github.com/kcp-dev/kcp/pkg/apis/core"
"github.com/kcp-dev/kcp/pkg/authorization/bootstrap"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
)

func main() {
Expand Down Expand Up @@ -209,7 +215,7 @@ func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numb
vwPort = "7444"

for i := 0; i < numberOfShards; i++ {
vw, err := newVirtualWorkspace(ctx, i, servingCA, hostIP.String(), logDirPath, workDirPath, clientCA)
vw, err := newVirtualWorkspace(ctx, i, servingCA, hostIP.String(), logDirPath, workDirPath, clientCA, cacheServerConfigPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -262,6 +268,37 @@ func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numb
return err
}

// Label region of shards
clientConfig, err := loadKubeConfig(filepath.Join(workDirPath, ".kcp/admin.kubeconfig"), "base")
if err != nil {
return err
}
config, err := clientConfig.ClientConfig()
if err != nil {
return err
}
client, err := kcpclientset.NewForConfig(config)
if err != nil {
return err
}
for i := range shards {
name := fmt.Sprintf("shard-%d", i)
if i == 0 {
name = "root"
}

if i >= len(regions) {
break
}
patch := fmt.Sprintf(`{"metadata":{"labels":{"region":%q}}}`, regions[i])
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
_, err := client.Cluster(core.RootCluster.Path()).CoreV1alpha1().Shards().Patch(ctx, name, types.MergePatchType, []byte(patch), metav1.PatchOptions{})
return err
}); err != nil {
return err
}
}

select {
case shardIndexErr := <-shardsErrCh:
return fmt.Errorf("shard %d exited: %w", shardIndexErr.index, shardIndexErr.error)
Expand All @@ -278,3 +315,49 @@ type indexErrTuple struct {
index int
error error
}

func loadKubeConfig(kubeconfigPath, contextName string) (clientcmd.ClientConfig, error) {
fs, err := os.Stat(kubeconfigPath)
if err != nil {
return nil, err
}
if fs.Size() == 0 {
return nil, fmt.Errorf("%s points to an empty file", kubeconfigPath)
}

rawConfig, err := clientcmd.LoadFromFile(kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("failed to load admin kubeconfig: %w", err)
}

return clientcmd.NewNonInteractiveClientConfig(*rawConfig, contextName, nil, nil), nil
}

var regions = []string{
"us-east-2",
"us-east-1",
"us-west-1",
"us-west-2",
"af-south-1",
"ap-east-1",
"ap-south-2",
"ap-southeast-3",
"ap-south-1",
"ap-northeast-3",
"ap-northeast-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-northeast-1",
"ca-central-1",
"eu-central-1",
"eu-west-1",
"eu-west-2",
"eu-south-1",
"eu-west-3",
"eu-south-2",
"eu-north-1",
"eu-central-2",
"me-south-1",
"me-central-1",
"sa-east-1",
}
2 changes: 1 addition & 1 deletion cmd/sharded-test-server/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func newShard(ctx context.Context, n int, args []string, standaloneVW bool, serv
fmt.Sprintf("--shard-virtual-workspace-ca-file=%s", filepath.Join(workDirPath, ".kcp", "serving-ca.crt")),
)
if len(cacheServerConfigPath) > 0 {
args = append(args, fmt.Sprintf("--cache-server-kubeconfig-file=%s", cacheServerConfigPath))
args = append(args, fmt.Sprintf("--cache-kubeconfig=%s", cacheServerConfigPath))
}

if standaloneVW {
Expand Down
3 changes: 2 additions & 1 deletion cmd/sharded-test-server/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type VirtualWorkspace struct {
writer headWriter
}

func newVirtualWorkspace(ctx context.Context, index int, servingCA *crypto.CA, hostIP string, logDirPath, workDirPath string, clientCA *crypto.CA) (*VirtualWorkspace, error) {
func newVirtualWorkspace(ctx context.Context, index int, servingCA *crypto.CA, hostIP string, logDirPath, workDirPath string, clientCA *crypto.CA, cacheServerConfigPath string) (*VirtualWorkspace, error) {
logger := klog.FromContext(ctx)

// create serving cert
Expand Down Expand Up @@ -131,6 +131,7 @@ func newVirtualWorkspace(ctx context.Context, index int, servingCA *crypto.CA, h
args := []string{}
args = append(args,
fmt.Sprintf("--kubeconfig=%s", kubeconfigPath),
fmt.Sprintf("--cache-kubeconfig=%s", cacheServerConfigPath),
fmt.Sprintf("--authentication-kubeconfig=%s", authenticationKubeconfigPath),
fmt.Sprintf("--client-ca-file=%s", clientCAFilePath),
fmt.Sprintf("--tls-private-key-file=%s", servingKeyFile),
Expand Down
18 changes: 17 additions & 1 deletion cmd/virtual-workspaces/command/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ func Run(ctx context.Context, o *options.Options) error {
return err
}

// parse cache kubeconfig
defaultCacheClientConfig, err := kubeConfig.ClientConfig()
if err != nil {
return err
}
cacheConfig, err := o.Cache.RestConfig(defaultCacheClientConfig)
if err != nil {
return err
}
cacheKcpClusterClient, err := kcpclientset.NewForConfig(cacheConfig)
if err != nil {
return err
}

// Don't throttle
nonIdentityConfig.QPS = -1

Expand Down Expand Up @@ -127,14 +141,15 @@ func Run(ctx context.Context, o *options.Options) error {
return err
}
wildcardKcpInformers := kcpinformers.NewSharedInformerFactory(kcpClusterClient, 10*time.Minute)
cacheKcpInformers := kcpinformers.NewSharedInformerFactory(cacheKcpClusterClient, 10*time.Minute)

if o.ProfilerAddress != "" {
//nolint:errcheck,gosec
go http.ListenAndServe(o.ProfilerAddress, nil)
}

// create apiserver
virtualWorkspaces, err := o.VirtualWorkspaces.NewVirtualWorkspaces(identityConfig, o.RootPathPrefix, wildcardKubeInformers, wildcardKcpInformers)
virtualWorkspaces, err := o.VirtualWorkspaces.NewVirtualWorkspaces(identityConfig, o.RootPathPrefix, wildcardKubeInformers, wildcardKcpInformers, cacheKcpInformers)
if err != nil {
return err
}
Expand All @@ -157,6 +172,7 @@ func Run(ctx context.Context, o *options.Options) error {
rootAPIServerConfig, err := virtualrootapiserver.NewRootAPIConfig(recommendedConfig, []virtualrootapiserver.InformerStart{
wildcardKubeInformers.Start,
wildcardKcpInformers.Start,
cacheKcpInformers.Start,
}, virtualWorkspaces)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions cmd/virtual-workspaces/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/component-base/logs"

cacheoptions "github.com/kcp-dev/kcp/pkg/cache/client/options"
virtualworkspacesoptions "github.com/kcp-dev/kcp/pkg/virtual/options"
)

Expand All @@ -44,6 +45,7 @@ type Options struct {
Context string
RootPathPrefix string

Cache cacheoptions.Cache
SecureServing genericapiserveroptions.SecureServingOptions
Authentication genericapiserveroptions.DelegatingAuthenticationOptions
Authorization virtualworkspacesoptions.Authorization
Expand All @@ -61,6 +63,7 @@ func NewOptions() *Options {

RootPathPrefix: DefaultRootPathPrefix,

Cache: *cacheoptions.NewCache(),
SecureServing: *genericapiserveroptions.NewSecureServingOptions(),
Authentication: *genericapiserveroptions.NewDelegatingAuthenticationOptions(),
Authorization: *virtualworkspacesoptions.NewAuthorization(),
Expand All @@ -79,6 +82,7 @@ func NewOptions() *Options {
}

func (o *Options) AddFlags(flags *pflag.FlagSet) {
o.Cache.AddFlags(flags)
o.SecureServing.AddFlags(flags)
o.Authentication.AddFlags(flags)
o.Logs.AddFlags(flags)
Expand All @@ -94,6 +98,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {

func (o *Options) Validate() error {
errs := []error{}
errs = append(errs, o.Cache.Validate()...)
errs = append(errs, o.SecureServing.Validate()...)
errs = append(errs, o.Authentication.Validate()...)
errs = append(errs, o.VirtualWorkspaces.Validate()...)
Expand Down
Loading