Skip to content

Commit

Permalink
feature: 支持容器扩展资源调度机制 TencentBlueKing#424
Browse files Browse the repository at this point in the history
  • Loading branch information
zmberg committed Apr 7, 2020
1 parent d5cc7bf commit 170bf62
Show file tree
Hide file tree
Showing 74 changed files with 1,216 additions and 1,379 deletions.
25 changes: 12 additions & 13 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ PACKAGEPATH=./build/bcs.${VERSION}
EXPORTPATH=./build/api_export

# options
default:api dns health client storage check executor mesos-driver mesos-watch scheduler loadbalance metricservice metriccollector exporter k8s-watch kube-agent k8s-driver api-export netservice sd-prometheus process-executor process-daemon bmsf-mesos-adapter hpacontroller kube-sche consoleproxy clb-controller gw-controller logbeat-sidecar csi-cbs bcs-webhook-server k8s-statefulsetplus network detection
specific:api dns health client storage check executor mesos-driver mesos-watch scheduler loadbalance metricservice metriccollector exporter k8s-watch kube-agent k8s-driver api-export netservice sd-prometheus process-executor process-daemon bmsf-mesos-adapter hpacontroller kube-sche consoleproxy clb-controller gw-controller logbeat-sidecar csi-cbs bcs-webhook-server k8s-statefulsetplus network detection
default:api dns health client storage check executor mesos-driver mesos-watch scheduler loadbalance metricservice metriccollector exporter k8s-watch kube-agent k8s-driver api-export netservice sd-prometheus process-executor process-daemon bmsf-mesos-adapter hpacontroller kube-sche consoleproxy clb-controller gw-controller logbeat-sidecar csi-cbs bcs-webhook-server k8s-statefulsetplus network detection cpuset
specific:api dns health client storage check executor mesos-driver mesos-watch scheduler loadbalance metricservice metriccollector exporter k8s-watch kube-agent k8s-driver api-export netservice sd-prometheus process-executor process-daemon bmsf-mesos-adapter hpacontroller kube-sche consoleproxy clb-controller gw-controller logbeat-sidecar csi-cbs bcs-webhook-server k8s-statefulsetplus network detection cpuset
k8s:api client storage k8s-watch kube-agent k8s-driver csi-cbs kube-sche k8s-statefulsetplus

allpack: svcpack k8spack mmpack mnpack
Expand Down Expand Up @@ -248,6 +248,10 @@ clb-controller:pre
cp -R ./install/conf/bcs-services/bcs-clb-controller ${PACKAGEPATH}/bcs-services
go build ${LDFLAG} -o ${PACKAGEPATH}/bcs-services/bcs-clb-controller/bcs-clb-controller ./bcs-services/bcs-clb-controller/main.go

cpuset:pre
mkdir -p ${PACKAGEPATH}/bcs-services/bcs-cpuset-device
go build ${LDFLAG} -o ${PACKAGEPATH}/bcs-services/bcs-cpuset-device/bcs-cpuset-device ./bcs-services/bcs-cpuset-device/main.go

gw-controller:pre
mkdir -p ${PACKAGEPATH}/bcs-services/bcs-gw-controller
cp -R ./install/conf/bcs-services/bcs-gw-controller ${PACKAGEPATH}/bcs-services
Expand Down
7 changes: 7 additions & 0 deletions bcs-common/common/types/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,10 @@ const (
HttpMethod_DELETE = "DELETE"
HttpMethod_PATCH = "PATCH"
)

type APIResponse struct {
Result bool `json:"result"`
Code int `json:"code"`
Data interface{} `json:"data"`
Message string `json:"message"`
}
5 changes: 3 additions & 2 deletions bcs-common/common/types/rc.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ type VolumeUnit struct {

// ResourceRequirements describes the compute resource requirement
type ResourceRequirements struct {
Limits ResourceList `json:"limits,omitempty"`
Requests ResourceList `json:"requests,omitempty"`
Limits ResourceList `json:"limits,omitempty"`
Requests ResourceList `json:"requests,omitempty"`
Externals ExternalResource `json:"externals,omitempty"`
}

type ResourceList struct {
Expand Down
15 changes: 15 additions & 0 deletions bcs-common/common/types/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,27 @@ type BcsClusterAgentSetting struct {
NoSchedule map[string]string `json:"noSchedule"`
//Pods index
Pods []string `json:"pods"`
//External Resources, key=ExternalResource.Name
ExternalResources map[string]ExternalResource
// Populated by the system.
// Read-only.
// Value must be treated as opaque by clients and .
ResourceVersion string `json:"-"`
}

type ExternalResource struct {
//InnerIP, agent ip
InnerIP string
//external resource name, example: bkbcs/cpuset
Name string
//Value
Value float64
//Capacity
Capacity float64
//device plugin socket address, exmaple: /data/bcs/cpuset.socket
Socket string
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *BcsClusterAgentSetting) DeepCopyInto(out *BcsClusterAgentSetting) {
*out = *in
Expand Down
139 changes: 139 additions & 0 deletions bcs-common/pkg/mesosdriver/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package mesosdriver

import (
"encoding/json"
"fmt"
"net/http"

"bk-bcs/bcs-common/common/blog"
"bk-bcs/bcs-common/common/http/httpclient"
commtypes "bk-bcs/bcs-common/common/types"
moduleDiscovery "bk-bcs/bcs-common/pkg/module-discovery"
)

type MesosDriverClient struct {
conf *Config

//to discovery mesos driver address
moduleDiscovery moduleDiscovery.ModuleDiscovery
//http client
cli *httpclient.HttpClient
}

// new MesosPlatform object
func NewMesosPlatform(conf *Config) (*MesosDriverClient, error) {
m := &MesosDriverClient{
conf: conf,
}

var err error
//start module discovery to discovery mesos driver
//moduleDiscovery.GetModuleServers(module) can return server information
m.moduleDiscovery, err = moduleDiscovery.NewDiscoveryV2(m.conf.ZkAddr, []string{commtypes.BCS_MODULE_MESOSAPISERVER})
if err != nil {
return nil, err
}
blog.Infof("NewDiscoveryV2 done")

//init http client
m.cli = httpclient.NewHttpClient()
//if https
if m.conf.ClientCert.IsSSL {
blog.Infof("NetworkDetection http client cert ssl")
m.cli.SetTlsVerity(m.conf.ClientCert.CAFile, m.conf.ClientCert.CertFile, m.conf.ClientCert.KeyFile,
m.conf.ClientCert.CertPasswd)
}
m.cli.SetHeader("Content-Type", "application/json")
m.cli.SetHeader("Accept", "application/json")
return m, nil
}

//get module address
//clusterid, example BCS-MESOS-10001
//return first parameter: module address, example 127.0.0.1:8090
func (m *MesosDriverClient) getModuleAddr(clusterid string) (string, error) {
serv, err := m.moduleDiscovery.GetRandModuleServer(commtypes.BCS_MODULE_MESOSAPISERVER)
if err != nil {
blog.Errorf("discovery zk %s module %s error %s", m.conf.ZkAddr, commtypes.BCS_MODULE_MESOSAPISERVER, err.Error())
return "", err
}
//serv is string object
data, _ := serv.(string)
var servInfo *commtypes.BcsMesosApiserverInfo
err = json.Unmarshal([]byte(data), &servInfo)
if err != nil {
blog.Errorf("getModuleAddr Unmarshal data(%s) to commtypes.BcsMesosApiserverInfo failed: %s", data, err.Error())
return "", err
}

return fmt.Sprintf("%s://%s:%d", servInfo.Scheme, servInfo.IP, servInfo.Port), nil
}

//update agent external resources
func (m *MesosDriverClient) UpdateAgentExternalResources(er *commtypes.ExternalResource) error {
_, err := m.requestMesosApiserver(m.conf.ClusterId, http.MethodPut, "agentsettings/externalresources", nil)
if err != nil {
blog.Errorf("update agent %s external resources error %s", er.InnerIP, err.Error())
return err
}
blog.Infof("update agent %s external resources %s success", er.InnerIP, er.Name)
return nil
}

//method=http.method: POST、GET、PUT、DELETE
//request url = address/url
//payload is request body
//if error!=nil, then request mesos failed, errom.Error() is failed message
//if error==nil, []byte is response body information
func (m *MesosDriverClient) requestMesosApiserver(clusterid, method, url string, payload []byte) ([]byte, error) {
//get mesos api address
addr, err := m.getModuleAddr(clusterid)
if err != nil {
return nil, fmt.Errorf("get cluster %s mesosapi failed: %s", clusterid, err.Error())
}
uri := fmt.Sprintf("%s/mesosdriver/v4/%s", addr, url)
m.cli.SetHeader("BCS-ClusterID", clusterid)

var by []byte
switch method {
case "GET":
by, err = m.cli.GET(uri, nil, payload)
case "POST":
by, err = m.cli.POST(uri, nil, payload)
case "DELETE":
by, err = m.cli.DELETE(uri, nil, payload)
case "PUT":
by, err = m.cli.PUT(uri, nil, payload)
default:
err = fmt.Errorf("uri %s method %s is invalid", uri, method)
}
if err != nil {
return nil, err
}

//unmarshal response.body
var result *commtypes.APIResponse
err = json.Unmarshal(by, &result)
if err != nil {
return nil, fmt.Errorf("Unmarshal body(%s) failed: %s", string(by), err.Error())
}
//if result.Result==false, then request failed
if !result.Result {
return nil, fmt.Errorf("request %s failed: %s", uri, result.Message)
}
by, _ = json.Marshal(result.Data)
return by, nil
}
31 changes: 31 additions & 0 deletions bcs-common/pkg/mesosdriver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package mesosdriver

//CertConfig is configuration of Cert
type CertConfig struct {
CAFile string
CertFile string
KeyFile string
CertPasswd string
IsSSL bool
}

type Config struct {
ZkAddr string
//http client cert config
ClientCert *CertConfig
//clusterid
ClusterId string
}
10 changes: 6 additions & 4 deletions bcs-common/pkg/module-discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ func (r *DiscoveryV2) GetModuleServers(moduleName string) ([]interface{}, error)
r.RLock()
defer r.RUnlock()

servs, ok := r.servers[moduleName]
if !ok {
return nil, fmt.Errorf("Module %s not found", moduleName)
servs := make([]interface{}, 0)
for k, v := range r.servers {
blog.Infof("k")
if strings.Contains(k, moduleName) {
servs = append(servs, v...)
}
}

if len(servs) == 0 {
return nil, fmt.Errorf("Module %s don't have endpoints", moduleName)
}
Expand Down
18 changes: 8 additions & 10 deletions bcs-mesos/bcs-container-executor/container/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@ package container

import (
"archive/tar"
"bk-bcs/bcs-mesos/bcs-container-executor/util"
"bytes"
"fmt"
"io"
"math"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"

Expand Down Expand Up @@ -352,15 +349,16 @@ func (docker *DockerContainer) CreateContainer(containerName string, containerTa
hostConfig.Memory = int64(containerTask.Resource.Mem * 1024 * 1024)
hostConfig.MemorySwap = int64(containerTask.Resource.Mem * 1024 * 1024)
}
if float64(runtime.NumCPU()) > containerTask.Resource.Cpus && containerTask.Resource.CPUSet > 0 {
if len(containerTask.Resource.CPUSet) > 0 {
//only setting cpu info when cpu request can be met in slave host
cpuList, numaList := util.GetBindingCPUs(int(math.Ceil(containerTask.Resource.Cpus)), int64(containerTask.Resource.Mem))
if len(cpuList) > 0 {
//change int list to string
hostConfig.CPUSetCPUs = util.ListJoin(cpuList)
hostConfig.CPUSetMEMs = util.ListJoin(numaList)
fmt.Fprintf(os.Stdout, "DEBUG: CPU List info %v, numa: %v, post: %s, %s\n", cpuList, numaList, hostConfig.CPUSetCPUs, hostConfig.CPUSetMEMs)
//cpuList, numaList := util.GetBindingCPUs(int(math.Ceil(containerTask.Resource.Cpus)), int64(containerTask.Resource.Mem))
//change int list to string
for _, set := range containerTask.Resource.CPUSet {
hostConfig.CPUSetCPUs += fmt.Sprintf("%s,", set)
hostConfig.CPUSetMEMs
}
hostConfig.CPUSetCPUs = strings.TrimRight(hostConfig.CPUSetCPUs, "")
fmt.Fprintf(os.Stdout, "DEBUG: task(%s) set cpuset(%s)\n", containerTask.Name, hostConfig.CPUSetCPUs)
}

if containerTask.LimitResource != nil && containerTask.LimitResource.Cpus > 0 {
Expand Down
Loading

0 comments on commit 170bf62

Please sign in to comment.