Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions deploy/kubernetes/operator/cmd/controller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"flag"

"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/controller"
)

func main() {
klog.InitFlags(nil)
cfg := &config.Config{}
cfg.AddFlags()
flag.Parse()

cfg.Complete()
klog.Infof("run config: %+v", cfg)

// create a manager for leader election.
mgr, err := ctrl.NewManager(cfg.RESTConfig, ctrl.Options{
LeaderElection: true,
LeaderElectionID: cfg.LeaderElectionID(),
})
if err != nil {
klog.Fatal(err)
}
// create a rss controller.
rc := controller.NewRSSController(cfg)
if err = mgr.Add(rc); err != nil {
klog.Fatal(err)
}
// start the rss controller.
if err = mgr.Start(cfg.RunCtx); err != nil {
klog.Fatal(err)
}
}
14 changes: 8 additions & 6 deletions deploy/kubernetes/operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ module github.com/apache/incubator-uniffle/deploy/kubernetes/operator
go 1.16

require (
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/ginkgo/v2 v2.1.4
github.com/onsi/gomega v1.19.0
github.com/onsi/gomega v1.20.0
github.com/parnurzeal/gorequest v0.2.16
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gomodules.xyz/jsonpatch/v2 v2.2.0
k8s.io/api v0.22.1
k8s.io/apimachinery v0.22.1
k8s.io/client-go v0.22.1
k8s.io/code-generator v0.22.1
k8s.io/api v0.22.2
k8s.io/apimachinery v0.22.2
k8s.io/client-go v0.22.2
k8s.io/code-generator v0.22.2
k8s.io/klog/v2 v2.9.0
k8s.io/utils v0.0.0-20210802155522-efc7438f0176
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
moul.io/http2curl v1.0.0 // indirect
sigs.k8s.io/controller-runtime v0.10.0
)
56 changes: 36 additions & 20 deletions deploy/kubernetes/operator/go.sum

Large diffs are not rendered by default.

51 changes: 51 additions & 0 deletions deploy/kubernetes/operator/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

import (
"flag"

"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
)

const (
flagWorkers = "workers"
)

// Config contains all configurations.
type Config struct {
Workers int
utils.GenericConfig
}

// LeaderElectionID returns leader election ID.
func (c *Config) LeaderElectionID() string {
return "rss-controller-" + constants.LeaderIDSuffix
}

// AddFlags adds all configurations to the global flags.
func (c *Config) AddFlags() {
flag.IntVar(&c.Workers, flagWorkers, 1, "Concurrency of the rss controller.")
c.GenericConfig.AddFlags()
}

// Complete is called before rss-controller runs.
func (c *Config) Complete() {
c.GenericConfig.Complete()
}
83 changes: 83 additions & 0 deletions deploy/kubernetes/operator/pkg/controller/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package constants

const (
// ContainerShuffleServerRPCPort indicates rpc port used in shuffle server containers.
ContainerShuffleServerRPCPort int32 = 19997
// ContainerShuffleServerHTTPPort indicates http port used in shuffle server containers.
ContainerShuffleServerHTTPPort int32 = 19996
// ContainerCoordinatorRPCPort indicates rpc port used in coordinator containers.
ContainerCoordinatorRPCPort int32 = 19997
// ContainerCoordinatorHTTPPort indicates http port used in coordinator containers.
ContainerCoordinatorHTTPPort int32 = 19996

// ShuffleServerRPCPortEnv indicates environment name of rpc port used by shuffle servers.
ShuffleServerRPCPortEnv = "SERVER_RPC_PORT"
// ShuffleServerHTTPPortEnv indicates environment name of http port used by shuffle servers.
ShuffleServerHTTPPortEnv = "SERVER_HTTP_PORT"
// CoordinatorRPCPortEnv indicates environment name of rpc port used by coordinators.
CoordinatorRPCPortEnv = "COORDINATOR_RPC_PORT"
// CoordinatorHTTPPortEnv indicates environment name of http port used by coordinators.
CoordinatorHTTPPortEnv = "COORDINATOR_HTTP_PORT"
// RSSCoordinatorQuorumEnv indicates environment name of rss coordinator quorum used by shuffle servers.
RSSCoordinatorQuorumEnv = "RSS_COORDINATOR_QUORUM"
// XmxSizeEnv indicates environment name of xmx size used by coordinators or shuffle servers.
XmxSizeEnv = "XMX_SIZE"
// ServiceNameEnv indicates environment name of service name used by coordinators or shuffle servers.
ServiceNameEnv = "SERVICE_NAME"
// NodeNameEnv indicates environment name of physical node name used by coordinators or shuffle servers.
NodeNameEnv = "NODE_NAME"
// RssIPEnv indicates environment name of shuffle servers' ip addresses.
RssIPEnv = "RSS_IP"

// CoordinatorServiceName defines environment variable value of "SERVICE_NAME" used by coordinators.
CoordinatorServiceName = "coordinator"
// ShuffleServerServiceName defines environment variable value of "SERVICE_NAME" used by shuffle servers.
ShuffleServerServiceName = "server"

// ExcludeNodesFile indicates volume mounting name of exclude nodes file
ExcludeNodesFile = "exclude-nodes-file"

// UpdateStatusError means reason of updating status of rss error
UpdateStatusError = "UpdateStatusError"

// OwnerLabel is the label of configMap's owner.
OwnerLabel = "uniffle.apache.org/owner-label"

// ConfigurationVolumeName is the name of configMap volume records configuration of coordinators or shuffle servers.
ConfigurationVolumeName = "configuration"
)

// PropertyKey defines property key in configuration of coordinators or shuffle servers.
type PropertyKey string

const (
// RPCServerPort represent rss port property in configuration of coordinators or shuffle servers.
RPCServerPort PropertyKey = "rss.rpc.server.port"
// JettyHTTPPort represent http port property in configuration of coordinators or shuffle servers.
JettyHTTPPort PropertyKey = "rss.jetty.http.port"

// CoordinatorQuorum represent coordinator quorum property in configuration of shuffle servers.
CoordinatorQuorum PropertyKey = "rss.coordinator.quorum"
// StorageBasePath represent storage base path property in configuration of shuffle servers.
StorageBasePath PropertyKey = "rss.storage.basePath"

// CoordinatorExcludeNodesPath represent exclude nodes path property in configuration of coordinators.
CoordinatorExcludeNodesPath PropertyKey = "rss.coordinator.exclude.nodes.file.path"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package controller

import (
"context"
"reflect"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubefake "k8s.io/client-go/kubernetes/fake"

unifflev1alpha1 "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned/fake"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
)

func buildEmptyPhaseRssObj() *unifflev1alpha1.RemoteShuffleService {
return &unifflev1alpha1.RemoteShuffleService{
ObjectMeta: metav1.ObjectMeta{
Name: testRssName,
Namespace: testNamespace,
ResourceVersion: "test",
},
Spec: unifflev1alpha1.RemoteShuffleServiceSpec{
Coordinator: &unifflev1alpha1.CoordinatorConfig{
ExcludeNodesFilePath: "/exclude_nodes",
},
},
Status: unifflev1alpha1.RemoteShuffleServiceStatus{},
}
}

func TestProcessEmptyPhaseRss(t *testing.T) {
rss := buildEmptyPhaseRssObj()

rssClient := fake.NewSimpleClientset(rss)
kubeClient := kubefake.NewSimpleClientset()

rc := newRSSController(&config.Config{
GenericConfig: utils.GenericConfig{
KubeClient: kubeClient,
RSSClient: rssClient,
},
})

for _, tt := range []struct {
name string
expectedRssStatus unifflev1alpha1.RemoteShuffleServiceStatus
expectedNeedRetry bool
expectedError error
}{
{
name: "process rss object which has just been created, and whose status phase is empty",
expectedRssStatus: unifflev1alpha1.RemoteShuffleServiceStatus{
Phase: unifflev1alpha1.RSSPending,
},
expectedNeedRetry: false,
},
} {
needRetry, err := rc.processNormal(rss)
if err != nil {
t.Errorf("process rss object failed: %v", err)
return
}
if needRetry != tt.expectedNeedRetry {
t.Errorf("unexpected result indicates whether to retrys: %v, expected: %v",
needRetry, tt.expectedNeedRetry)
return
}
updatedRss, getErr := rssClient.UniffleV1alpha1().RemoteShuffleServices(rss.Namespace).
Get(context.TODO(), rss.Name, metav1.GetOptions{})
if getErr != nil {
t.Errorf("get updated rss object failed: %v", err)
return
}
if !reflect.DeepEqual(updatedRss.Status, tt.expectedRssStatus) {
t.Errorf("unexpected status of updated rss object: %+v, expected: %+v",
updatedRss.Status, tt.expectedRssStatus)
return
}
}
}
Loading