diff --git a/api/doris/v1/doriscluster_util.go b/api/doris/v1/doriscluster_util.go index 131f6979..6f396771 100644 --- a/api/doris/v1/doriscluster_util.go +++ b/api/doris/v1/doriscluster_util.go @@ -2,6 +2,7 @@ package v1 import ( "github.com/selectdb/doris-operator/pkg/common/utils/metadata" + corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "strings" ) @@ -331,3 +332,22 @@ func getFeAddrForBroker(dcr *DorisCluster) (string, int) { return getFEAccessAddrForFEADD(dcr) } + +// GetClusterSecret get the cluster's adminuser and password through the cluster management account and password configuration in crd +func GetClusterSecret(dcr *DorisCluster, secret *corev1.Secret) (adminUserName, password string) { + if secret != nil && secret.Data != nil { + return string(secret.Data["username"]), string(secret.Data["password"]) + } + // AdminUser was deprecated since 1.4.1 + if dcr.Spec.AdminUser != nil { + return dcr.Spec.AdminUser.Name, dcr.Spec.AdminUser.Password + } + return "root", "" +} + +func IsReconcilingStatusPhase(c *ComponentStatus) bool { + return c.ComponentCondition.Phase == Upgrading || + c.ComponentCondition.Phase == Scaling || + c.ComponentCondition.Phase == Restarting || + c.ComponentCondition.Phase == Reconciling +} diff --git a/api/doris/v1/doriscluster_webhook.go b/api/doris/v1/doriscluster_webhook.go index dfe167b9..8dcc74b0 100644 --- a/api/doris/v1/doriscluster_webhook.go +++ b/api/doris/v1/doriscluster_webhook.go @@ -17,7 +17,9 @@ limitations under the License. package v1 import ( + "fmt" "k8s.io/apimachinery/pkg/runtime" + kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -56,8 +58,16 @@ func (r *DorisCluster) ValidateCreate() error { // ValidateUpdate implements webhook.Validator so a unnamedwatches will be registered for the type func (r *DorisCluster) ValidateUpdate(old runtime.Object) error { klog.Info("validate update", "name", r.Name) + var errors []error + // fe FeSpec.Replicas must greater than or equal to FeSpec.ElectionNumber + if *r.Spec.FeSpec.Replicas < *r.Spec.FeSpec.ElectionNumber { + errors = append(errors, fmt.Errorf("'FeSpec.Replicas' error: the number of FeSpec.Replicas should greater than or equal to FeSpec.ElectionNumber")) + } + + if len(errors) != 0 { + return kerrors.NewAggregate(errors) + } - // TODO(user): fill in your validation logic upon object update. return nil } diff --git a/api/doris/v1/types.go b/api/doris/v1/types.go index a13342d6..bc2a2538 100644 --- a/api/doris/v1/types.go +++ b/api/doris/v1/types.go @@ -377,6 +377,10 @@ const ( WaitScheduling ComponentPhase = "waitScheduling" HaveMemberFailed ComponentPhase = "haveMemberFailed" Available ComponentPhase = "available" + Initializing ComponentPhase = "initializing" + Upgrading ComponentPhase = "upgrading" + Scaling ComponentPhase = "scaling" + Restarting ComponentPhase = "restarting" ) // +genclient diff --git a/api/doris/v1/zz_generated.deepcopy.go b/api/doris/v1/zz_generated.deepcopy.go index 273aa066..9cb4938b 100644 --- a/api/doris/v1/zz_generated.deepcopy.go +++ b/api/doris/v1/zz_generated.deepcopy.go @@ -24,7 +24,7 @@ package v1 import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. diff --git a/config/operator/operator.yaml b/config/operator/operator.yaml index 808f8c21..866ef133 100644 --- a/config/operator/operator.yaml +++ b/config/operator/operator.yaml @@ -147,6 +147,7 @@ rules: - watch - update - patch + - delete - apiGroups: - "" resources: diff --git a/go.mod b/go.mod index c01a2f22..0aa397ea 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/selectdb/doris-operator -go 1.19 +go 1.21 require ( github.com/davecgh/go-spew v1.1.1 @@ -25,6 +25,7 @@ require ( github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect + github.com/frankban/quicktest v1.14.5 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/zapr v1.2.3 // indirect @@ -59,18 +60,19 @@ require ( github.com/spf13/cast v1.5.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/stretchr/testify v1.8.4 // indirect github.com/subosito/gotenv v1.4.2 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/mod v0.8.0 // indirect - golang.org/x/net v0.10.0 // indirect + golang.org/x/mod v0.14.0 // indirect + golang.org/x/net v0.16.0 // indirect golang.org/x/oauth2 v0.7.0 // indirect - golang.org/x/sys v0.8.0 // indirect - golang.org/x/term v0.8.0 // indirect - golang.org/x/text v0.9.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/term v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect - golang.org/x/tools v0.6.0 // indirect + golang.org/x/tools v0.14.0 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.30.0 // indirect diff --git a/go.sum b/go.sum index 4e207c35..80bfa9ee 100644 --- a/go.sum +++ b/go.sum @@ -80,7 +80,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww= github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= -github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= +github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA= +github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -216,6 +217,7 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -254,6 +256,7 @@ github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E= github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmvt1jM= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -289,6 +292,7 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5 github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -315,8 +319,9 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8= github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -334,6 +339,7 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= @@ -381,8 +387,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -420,8 +426,8 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos= +golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -491,12 +497,12 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -505,8 +511,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -563,8 +569,8 @@ golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= +golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/helm-charts/doris-operator/templates/clusterrole.yaml b/helm-charts/doris-operator/templates/clusterrole.yaml index 3a6975c1..fc033d13 100644 --- a/helm-charts/doris-operator/templates/clusterrole.yaml +++ b/helm-charts/doris-operator/templates/clusterrole.yaml @@ -69,6 +69,7 @@ rules: - watch - update - patch + - delete - apiGroups: - "" resources: diff --git a/helm-charts/doris/values.yaml b/helm-charts/doris/values.yaml index 4a39837d..cd916286 100644 --- a/helm-charts/doris/values.yaml +++ b/helm-charts/doris/values.yaml @@ -22,7 +22,7 @@ dorisCluster: # 1. run shell: echo -n '{your_password}' | base64 to get password base64 string # 2. run shell: echo -n '{your_user}' | base64 to get user base64 string # 3. Fill the encrypted string into the corresponding position - # as follow, username is 'root' , password is 't0p-Secre' + # as follow, username is 'root' , password is 't0p-Secret' authSecret: {} # username: cm9vdA== # password: dDBwLVNlY3JldA== diff --git a/pkg/common/utils/k8s/client.go b/pkg/common/utils/k8s/client.go index 22af9386..2b30cd87 100644 --- a/pkg/common/utils/k8s/client.go +++ b/pkg/common/utils/k8s/client.go @@ -5,11 +5,14 @@ import ( "errors" "fmt" dorisv1 "github.com/selectdb/doris-operator/api/doris/v1" + "github.com/selectdb/doris-operator/pkg/common/utils" + "github.com/selectdb/doris-operator/pkg/common/utils/resource" appv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/autoscaling/v1" v2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -223,3 +226,93 @@ func GetConfigMaps(ctx context.Context, k8scient client.Client, namespace string } return configMaps, nil } + +// get the Service by namespace and name. +func GetService(ctx context.Context, k8sclient client.Client, namespace, name string) (*corev1.Service, error) { + var svc corev1.Service + if err := k8sclient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, &svc); err != nil { + return nil, err + } + return &svc, nil +} + +func GetPods(ctx context.Context, k8sclient client.Client, targetDCR dorisv1.DorisCluster, componentType dorisv1.ComponentType) (corev1.PodList, error) { + pods := corev1.PodList{} + + err := k8sclient.List( + ctx, + &pods, + client.InNamespace(targetDCR.Namespace), + client.MatchingLabels(dorisv1.GetPodLabels(&targetDCR, componentType)), + ) + if err != nil { + return pods, err + } + + return pods, nil +} + +// GetConfig get conf from configmap by componentType , if not use configmap get an empty map. +func GetConfig(ctx context.Context, k8sclient client.Client, configMapInfo *dorisv1.ConfigMapInfo, namespace string, componentType dorisv1.ComponentType) (map[string]interface{}, error) { + cms := resource.GetMountConfigMapInfo(*configMapInfo) + if len(cms) == 0 { + return make(map[string]interface{}), nil + } + + configMaps, err := GetConfigMaps(ctx, k8sclient, namespace, cms) + if err != nil { + klog.Errorf("GetConfig get configmap failed, namespace: %s,err: %s \n", namespace, err.Error()) + } + res, resolveErr := resource.ResolveConfigMaps(configMaps, componentType) + return res, utils.MergeError(err, resolveErr) +} + +// SetDorisClusterPhase set DorisCluster Phase status, +// Perform a check before setting, and do not change if the status is the same as the last time +func SetDorisClusterPhase( + ctx context.Context, + k8sclient client.Client, + dcrName, namespace string, + phase dorisv1.ComponentPhase, + componentType dorisv1.ComponentType, +) error { + var edcr dorisv1.DorisCluster + if err := k8sclient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dcrName}, &edcr); err != nil { + return err + } + isStatusEqual := false + switch componentType { + case dorisv1.Component_FE: + isStatusEqual = (edcr.Status.FEStatus.ComponentCondition.Phase == phase) + edcr.Status.FEStatus.ComponentCondition.Phase = phase + case dorisv1.Component_BE: + isStatusEqual = (edcr.Status.BEStatus.ComponentCondition.Phase == phase) + edcr.Status.BEStatus.ComponentCondition.Phase = phase + case dorisv1.Component_CN: + isStatusEqual = (edcr.Status.CnStatus.ComponentCondition.Phase == phase) + edcr.Status.CnStatus.ComponentCondition.Phase = phase + case dorisv1.Component_Broker: + isStatusEqual = (edcr.Status.BrokerStatus.ComponentCondition.Phase == phase) + edcr.Status.BrokerStatus.ComponentCondition.Phase = phase + default: + klog.Infof("SetDorisClusterPhase not support type=", componentType) + return nil + } + if isStatusEqual { + klog.Infof("UpdateDorisClusterPhase will not change cluster %s Phase, it is already %s ,DCR name: %s, namespace: %s,", componentType, phase, dcrName, namespace) + return nil + } + return k8sclient.Status().Update(ctx, &edcr) +} + +// DeletePVC clean up existing pvc by pvc name, namespace and labels +func DeletePVC(ctx context.Context, k8sclient client.Client, namespace, pvcName string, labels map[string]string) error { + pvc := corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + Namespace: namespace, + Labels: labels, + }, + } + return k8sclient.Delete(ctx, &pvc) +} diff --git a/pkg/common/utils/mysql/decommission.go b/pkg/common/utils/mysql/decommission.go deleted file mode 100644 index fa2ba60e..00000000 --- a/pkg/common/utils/mysql/decommission.go +++ /dev/null @@ -1,39 +0,0 @@ -package mysql - -// DecommissionInfo Decommission task info -type DecommissionInfo struct { - SuccessBes []Backend - DecommissioningBes []Backend -} - -func (di *DecommissionInfo) IsFinished() bool { - if len(di.DecommissioningBes) > 0 { - return false - } - return true -} - -// NewDecommissionInfo build DecommissionInfo check successes nodes and dropping nodes -// currentNodes is show backends res, decommissionBes is decommissioned target be nodes -func NewDecommissionInfo(currentNodes []Backend, decommissionBes []Backend) *DecommissionInfo { - var successBes []Backend - var decommissioningBes []Backend - decommissioningMap := make(map[string]Backend) - - for _, node := range currentNodes { - if node.SystemDecommissioned { - decommissioningBes = append(decommissioningBes, node) - decommissioningMap[node.Host] = node - } - } - - for _, be := range decommissionBes { - _, ok := decommissioningMap[be.Host] - if !ok { - successBes = append(successBes, be) - } - } - //fmt.Printf("finished:%d , droping: %d ", len(successBes), len(decommissioningBes)) - return &DecommissionInfo{successBes, decommissioningBes} - -} diff --git a/pkg/common/utils/mysql/doris.go b/pkg/common/utils/mysql/doris.go index ced009c6..b70bb2e8 100644 --- a/pkg/common/utils/mysql/doris.go +++ b/pkg/common/utils/mysql/doris.go @@ -4,6 +4,11 @@ import ( _ "github.com/go-sql-driver/mysql" ) +const ( + FE_FOLLOWER_ROLE = "FOLLOWER" + FE_OBSERVE_ROLE = "OBSERVER" +) + type Frontend struct { Name string `json:"name" db:"Name"` Host string `json:"host" db:"Host"` diff --git a/pkg/common/utils/mysql/mysql.go b/pkg/common/utils/mysql/mysql.go index 9d2ec68c..d8b268ec 100644 --- a/pkg/common/utils/mysql/mysql.go +++ b/pkg/common/utils/mysql/mysql.go @@ -2,7 +2,6 @@ package mysql import ( "database/sql" - "errors" "fmt" _ "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" @@ -25,17 +24,21 @@ func NewDorisSqlDB(cfg DBConfig) (*DB, error) { dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database) db, err := sqlx.Open("mysql", dsn) if err != nil { - klog.Errorf("failed open doris sql client connection, err: %s \n", err) + klog.Errorf("NewDorisSqlDB sqlx.Open failed open doris sql client connection, err: %s \n", err.Error()) return nil, err } if err = db.Ping(); err != nil { - klog.Errorf("failed ping doris sql client connection, err: %s\n", err.Error()) + klog.Errorf("NewDorisSqlDB sqlx.Open.Ping failed ping doris sql client connection, err: %s \n", err.Error()) return nil, err } return &DB{db}, nil } +func (db *DB) Close() error { + return db.DB.Close() +} + func (db *DB) Exec(query string, args ...interface{}) (sql.Result, error) { return db.DB.Exec(query, args...) } @@ -44,19 +47,19 @@ func (db *DB) Select(dest interface{}, query string, args ...interface{}) error return db.DB.Select(dest, query, args...) } -func (db *DB) ShowFrontends() ([]Frontend, error) { - var fes []Frontend +func (db *DB) ShowFrontends() ([]*Frontend, error) { + var fes []*Frontend err := db.Select(&fes, "show frontends") return fes, err } -func (db *DB) ShowBackends() ([]Backend, error) { - var bes []Backend +func (db *DB) ShowBackends() ([]*Backend, error) { + var bes []*Backend err := db.Select(&bes, "show backends") return bes, err } -func (db *DB) DecommissionBE(nodes []Backend) error { +func (db *DB) DecommissionBE(nodes []*Backend) error { if len(nodes) == 0 { klog.Infoln("mysql DecommissionBE BE node is empty") return nil @@ -71,15 +74,9 @@ func (db *DB) DecommissionBE(nodes []Backend) error { return err } -func (db *DB) CheckDecommissionBE(nodes []Backend) (isFinished bool, err error) { - backends, err := db.ShowBackends() - info := NewDecommissionInfo(backends, nodes) - return info.IsFinished(), err -} - -func (db *DB) DropObserver(nodes []Frontend) error { +func (db *DB) DropObserver(nodes []*Frontend) error { if len(nodes) == 0 { - klog.Infoln("mysql DropObserver observer node is empty") + klog.Infoln("DropObserver observer node is empty") return nil } var alter string @@ -90,17 +87,37 @@ func (db *DB) DropObserver(nodes []Frontend) error { return err } -func (db *DB) GetMaster() (*Frontend, error) { +func (db *DB) GetObservers() ([]*Frontend, error) { frontends, err := db.ShowFrontends() if err != nil { - klog.Errorf("GetMaster show frontends failed, err: %s\n", err.Error()) + klog.Errorf("GetObservers show frontends failed, err: %s\n", err.Error()) return nil, err } + var res []*Frontend + for _, fe := range frontends { + if fe.Role == FE_OBSERVE_ROLE { + res = append(res, fe) + } + } + return res, nil +} + +// GetFollowers return fe master,all followers(including master) and err +func (db *DB) GetFollowers() (*Frontend, []*Frontend, error) { + frontends, err := db.ShowFrontends() + if err != nil { + klog.Errorf("GetFollowers show frontends failed, err: %s\n", err.Error()) + return nil, nil, err + } + var res []*Frontend + var master *Frontend for _, fe := range frontends { - if fe.IsMaster { - return &fe, nil + if fe.Role == FE_FOLLOWER_ROLE { + res = append(res, fe) + if fe.IsMaster { + master = fe + } } } - errMessage := fmt.Sprintf("GetMaster note not find fe master, all of fe nodes info as such: %+v", frontends) - return nil, errors.New(errMessage) + return master, res, nil } diff --git a/pkg/common/utils/mysql/mysql_test.go b/pkg/common/utils/mysql/mysql_test.go index a715083a..bd87b281 100644 --- a/pkg/common/utils/mysql/mysql_test.go +++ b/pkg/common/utils/mysql/mysql_test.go @@ -4,7 +4,6 @@ import ( _ "crypto/tls" "fmt" "testing" - "time" ) func TestAPIs(t *testing.T) { @@ -23,13 +22,6 @@ func TestAPIs(t *testing.T) { } defer db.Close() - // get master - master, err := db.GetMaster() - if err != nil { - fmt.Printf("get master err:%s \n", err.Error()) - } - fmt.Printf("getmaster :%+v \n", master) - // ShowFrontends frontends, err := db.ShowFrontends() if err != nil { @@ -45,9 +37,9 @@ func TestAPIs(t *testing.T) { fmt.Printf("ShowBackends :%+v \n", bes) // DropObserver - arr := []Frontend{ - Frontend{Host: "doriscluster-sample-fe-1.doriscluster-sample-fe-internal.doris.svc.cluster.local", EditLogPort: 9010}, - Frontend{Host: "doriscluster-sample-fe-2.doriscluster-sample-fe-internal.doris.svc.cluster.local", EditLogPort: 9010}, + arr := []*Frontend{ + &Frontend{Host: "doriscluster-sample-fe-1.doriscluster-sample-fe-internal.doris.svc.cluster.local", EditLogPort: 9010}, + &Frontend{Host: "doriscluster-sample-fe-2.doriscluster-sample-fe-internal.doris.svc.cluster.local", EditLogPort: 9010}, } db.DropObserver(arr) @@ -59,24 +51,11 @@ func TestAPIs(t *testing.T) { fmt.Printf("ShowBackends after drop %+v \n", bes) // DecommissionBE - arr1 := []Backend{ - Backend{Host: "doriscluster-sample-be-3.doriscluster-sample-be-internal.doris.svc.cluster.local", HeartbeatPort: 9050}, - Backend{Host: "doriscluster-sample-be-4.doriscluster-sample-be-internal.doris.svc.cluster.local", HeartbeatPort: 9050}, + arr1 := []*Backend{ + &Backend{Host: "doriscluster-sample-be-3.doriscluster-sample-be-internal.doris.svc.cluster.local", HeartbeatPort: 9050}, + &Backend{Host: "doriscluster-sample-be-4.doriscluster-sample-be-internal.doris.svc.cluster.local", HeartbeatPort: 9050}, } db.DecommissionBE(arr1) - for i := 0; i < 20000; i++ { - finished, err := db.CheckDecommissionBE(arr1) - fmt.Printf("DecommissionBE check %d : is_finished=%t } \n", i, finished) - if err != nil { - fmt.Printf("DecommissionBEcheck err:%s \n", err.Error()) - } - if finished { - fmt.Printf("DecommissionBE finished") - break - } - time.Sleep(500 * time.Millisecond) - - } bes, err = db.ShowBackends() if err != nil { diff --git a/pkg/common/utils/resource/configmap.go b/pkg/common/utils/resource/configmap.go index 19d8416d..48312606 100644 --- a/pkg/common/utils/resource/configmap.go +++ b/pkg/common/utils/resource/configmap.go @@ -37,6 +37,10 @@ const ( const BROKER_IPC_PORT = "broker_ipc_port" const GRACE_SHUTDOWN_WAIT_SECONDS = "grace_shutdown_wait_seconds" +const ENABLE_FQDN = "enable_fqdn_mode" +const START_MODEL_FQDN = "FQDN" +const START_MODEL_IP = "IP" + // defMap the default port about abilities. var defMap = map[string]int32{ HTTP_PORT: 8030, @@ -51,6 +55,23 @@ var defMap = map[string]int32{ BROKER_IPC_PORT: 8000, } +// GetStartMode return fe host type, fqdn(host) or ip, from 'fe.conf' enable_fqdn_mode +func GetStartMode(config map[string]interface{}) string { + // not use configmap + if len(config) == 0 { + return START_MODEL_FQDN + } + + // use configmap + v, ok := config[ENABLE_FQDN] + if ok && v.(string) == "true" { + return START_MODEL_FQDN + } else { + return START_MODEL_IP + } + +} + func GetDefaultPort(key string) int32 { return defMap[key] } diff --git a/pkg/common/utils/resource/persistent_volume_claim.go b/pkg/common/utils/resource/persistent_volume_claim.go index 44cd160d..24bd6ba8 100644 --- a/pkg/common/utils/resource/persistent_volume_claim.go +++ b/pkg/common/utils/resource/persistent_volume_claim.go @@ -12,17 +12,20 @@ var ( pvc_manager_annotation = "selectdb.doris.com/pvc-manager" ) -func BuildPVC(volume dorisv1.PersistentVolume, labels map[string]string, namespace, stsName, ordinal string) corev1.PersistentVolumeClaim { +func BuildPVCName(stsName, ordinal, volumeName string) string { pvcName := stsName + "-" + ordinal - if volume.Name != "" { - pvcName = volume.Name + "-" + pvcName + if volumeName != "" { + pvcName = volumeName + "-" + pvcName } + return pvcName +} +func BuildPVC(volume dorisv1.PersistentVolume, labels map[string]string, namespace, stsName, ordinal string) corev1.PersistentVolumeClaim { annotations := buildPVCAnnotations(volume) pvc := corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ - Name: pvcName, + Name: BuildPVCName(stsName, ordinal, volume.Name), Namespace: namespace, Labels: labels, Annotations: annotations, diff --git a/pkg/common/utils/resource/pod.go b/pkg/common/utils/resource/pod.go index ffa789db..59066219 100644 --- a/pkg/common/utils/resource/pod.go +++ b/pkg/common/utils/resource/pod.go @@ -104,7 +104,7 @@ func NewPodTemplateSpec(dcr *v1.DorisCluster, componentType v1.ComponentType) co pts := corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Name: generatePodTemplateName(dcr, componentType), + Name: GeneratePodTemplateName(dcr, componentType), Annotations: spec.Annotations, Labels: v1.GetPodLabels(dcr, componentType), }, @@ -393,7 +393,7 @@ func getCommand(componentType v1.ComponentType) (commands []string, args []strin } } -func generatePodTemplateName(dcr *v1.DorisCluster, componentType v1.ComponentType) string { +func GeneratePodTemplateName(dcr *v1.DorisCluster, componentType v1.ComponentType) string { switch componentType { case v1.Component_FE: return dcr.Name + "-" + string(v1.Component_FE) diff --git a/pkg/controller/doriscluster_controller.go b/pkg/controller/doriscluster_controller.go index cf440e19..c597f28c 100644 --- a/pkg/controller/doriscluster_controller.go +++ b/pkg/controller/doriscluster_controller.go @@ -108,6 +108,7 @@ func (r *DorisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request } dcr := edcr.DeepCopy() + if !dcr.DeletionTimestamp.IsZero() { r.resourceClean(ctx, dcr) return ctrl.Result{}, nil @@ -125,6 +126,7 @@ func (r *DorisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request r.clearNoEffectResources(ctx, dcr) for _, rc := range r.Scs { //update component status. + if err := rc.UpdateComponentStatus(dcr); err != nil { klog.Errorf("DorisClusterReconciler reconcile update component %s status failed.err=%s\n", rc.GetControllerName(), err.Error()) return requeueIfError(err) @@ -213,7 +215,7 @@ func (r *DorisClusterReconciler) watchPodBuilder(builder *ctrl.Builder) *ctrl.Bu labels := a.GetLabels() dorisName := labels[dorisv1.DorisClusterLabelKey] klog.Infof("DorisClusterReconciler watch pod %s change related to doris cluster %s", a.GetName(), dorisName) - if dorisName == "" { + if dorisName != "" { return []reconcile.Request{ {NamespacedName: types.NamespacedName{ Name: dorisName, diff --git a/pkg/controller/sub_controller/be/controller.go b/pkg/controller/sub_controller/be/controller.go index 54fee7c9..a7c60ec7 100644 --- a/pkg/controller/sub_controller/be/controller.go +++ b/pkg/controller/sub_controller/be/controller.go @@ -7,11 +7,9 @@ import ( "github.com/selectdb/doris-operator/pkg/common/utils/resource" "github.com/selectdb/doris-operator/pkg/controller/sub_controller" appv1 "k8s.io/api/apps/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" - "time" ) type Controller struct { @@ -39,7 +37,7 @@ func (be *Controller) Sync(ctx context.Context, dcr *v1.DorisCluster) error { if dcr.Spec.BeSpec == nil { return nil } - + be.InitStatus(dcr, v1.Component_BE) if !be.FeAvailable(dcr) { return nil } @@ -94,20 +92,7 @@ func (be *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error { return nil } - bs := &v1.ComponentStatus{ - ComponentCondition: v1.ComponentCondition{ - SubResourceName: v1.GenerateComponentStatefulSetName(cluster, v1.Component_BE), - Phase: v1.Reconciling, - LastTransitionTime: metav1.NewTime(time.Now()), - }, - } - - if cluster.Status.BEStatus != nil { - bs = cluster.Status.BEStatus.DeepCopy() - } - cluster.Status.BEStatus = bs - bs.AccessService = v1.GenerateExternalServiceName(cluster, v1.Component_BE) - return be.ClassifyPodsByStatus(cluster.Namespace, bs, v1.GenerateStatefulSetSelector(cluster, v1.Component_BE), *cluster.Spec.BeSpec.Replicas) + return be.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.BEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_BE), *cluster.Spec.BeSpec.Replicas) } func (be *Controller) ClearResources(ctx context.Context, dcr *v1.DorisCluster) (bool, error) { diff --git a/pkg/controller/sub_controller/events.go b/pkg/controller/sub_controller/events.go index f121c8d1..6d8a198e 100644 --- a/pkg/controller/sub_controller/events.go +++ b/pkg/controller/sub_controller/events.go @@ -8,14 +8,17 @@ const ( // 'reason' should be short and unique; it should be in UpperCamelCase format (starting with a capital letter). const ( - StatefulSetNotExist = "StatefulSetNotExist" - AutoScalerDeleteFailed = "AutoScalerDeleteFailed" - ComponentImageUpdate = "ComponentImageUpdate" - PVCListFailed = "PVCListFailed" - PVCUpdate = "PVCUpdated" - PVCUpdateFailed = "PVCUpdateFailed" - PVCCreate = "PVCCreate" - PVCCreateFailed = "PVCCreateFailed" - FollowerScaleDownFailed = "FollowerScaleDownFailed" - ConfigMapPathRepeated = "ConfigMapPathRepeated" + StatefulSetNotExist = "StatefulSetNotExist" + AutoScalerDeleteFailed = "AutoScalerDeleteFailed" + ComponentImageUpdate = "ComponentImageUpdate" + PVCListFailed = "PVCListFailed" + PVCUpdate = "PVCUpdated" + PVCUpdateFailed = "PVCUpdateFailed" + PVCDelete = "PVCDelete" + PVCDeleteFailed = "PVCDeleteFailed" + PVCCreate = "PVCCreate" + PVCCreateFailed = "PVCCreateFailed" + FollowerScaleDownFailed = "FollowerScaleDownFailed" + ConfigMapPathRepeated = "ConfigMapPathRepeated" + ClusterOperationalConflicts = "ClusterOperationalConflicts" ) diff --git a/pkg/controller/sub_controller/fe/controller.go b/pkg/controller/sub_controller/fe/controller.go index f3bde314..4344e82d 100644 --- a/pkg/controller/sub_controller/fe/controller.go +++ b/pkg/controller/sub_controller/fe/controller.go @@ -7,11 +7,9 @@ import ( "github.com/selectdb/doris-operator/pkg/common/utils/resource" "github.com/selectdb/doris-operator/pkg/controller/sub_controller" appv1 "k8s.io/api/apps/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" - "time" ) type Controller struct { @@ -23,6 +21,10 @@ func (fc *Controller) ClearResources(ctx context.Context, cluster *v1.DorisClust if cluster.Status.FEStatus == nil { return true, nil } + if err := fc.RecycleResources(ctx, cluster, v1.Component_FE); err != nil { + klog.Errorf("fe ClearResources recycle pvc resource for reconciling namespace %s name %s!", cluster.Namespace, cluster.Name) + return false, err + } if cluster.DeletionTimestamp.IsZero() { return true, nil @@ -38,22 +40,7 @@ func (fc *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error { return nil } - fs := &v1.ComponentStatus{ - ComponentCondition: v1.ComponentCondition{ - SubResourceName: v1.GenerateComponentStatefulSetName(cluster, v1.Component_FE), - Phase: v1.Reconciling, - LastTransitionTime: metav1.NewTime(time.Now()), - }, - } - - if cluster.Status.FEStatus != nil { - fs = cluster.Status.FEStatus.DeepCopy() - } - - cluster.Status.FEStatus = fs - fs.AccessService = v1.GenerateExternalServiceName(cluster, v1.Component_FE) - - return fc.ClassifyPodsByStatus(cluster.Namespace, fs, v1.GenerateStatefulSetSelector(cluster, v1.Component_FE), *cluster.Spec.FeSpec.Replicas) + return fc.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.FEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_FE), *cluster.Spec.FeSpec.Replicas) } // New construct a FeController. @@ -76,6 +63,7 @@ func (fc *Controller) Sync(ctx context.Context, cluster *v1.DorisCluster) error klog.Info("fe Controller Sync ", "the fe component is not needed ", "namespace ", cluster.Namespace, " doris cluster name ", cluster.Name) return nil } + fc.InitStatus(cluster, v1.Component_FE) feSpec := cluster.Spec.FeSpec //get the fe configMap for resolve ports. @@ -101,35 +89,24 @@ func (fc *Controller) Sync(ctx context.Context, cluster *v1.DorisCluster) error return err } - st := fc.buildFEStatefulSet(cluster) if !fc.PrepareReconcileResources(ctx, cluster, v1.Component_FE) { klog.Infof("fe controller sync preparing resource for reconciling namespace %s name %s!", cluster.Namespace, cluster.Name) return nil } - if err = k8s.ApplyStatefulSet(ctx, fc.K8sclient, &st, func(new *appv1.StatefulSet, est *appv1.StatefulSet) bool { - //It is not allowed to set replicas smaller than electionNumber when scale down - electionNumber := *cluster.Spec.FeSpec.ElectionNumber - if *st.Spec.Replicas < electionNumber && *st.Spec.Replicas < *est.Spec.Replicas { - //if electionNumber > *est.Spec.Replicas ,Replicas should be corrected to *est.Spec.Replicas - //if electionNumber < *est.Spec.Replicas ,Replicas should be corrected to electionNumber - *cluster.Spec.FeSpec.Replicas = min(electionNumber, *est.Spec.Replicas) - *st.Spec.Replicas = min(electionNumber, *est.Spec.Replicas) - fc.K8srecorder.Event(cluster, sub_controller.EventWarning, sub_controller.FollowerScaleDownFailed, "Replicas is not allow less than ElectionNumber,may violation of consistency agreement cause FE to be unavailable, replicas set to min(electionNumber, currentReplicas): "+string(min(electionNumber, *est.Spec.Replicas))) - } - fc.RestrictConditionsEqual(new, est) - return resource.StatefulSetDeepEqual(new, est, false) + if err = fc.prepareStatefulsetApply(ctx, cluster); err != nil { + return err + } + + st := fc.buildFEStatefulSet(cluster) + if err = k8s.ApplyStatefulSet(ctx, fc.K8sclient, &st, func(new *appv1.StatefulSet, old *appv1.StatefulSet) bool { + fc.RestrictConditionsEqual(new, old) + return resource.StatefulSetDeepEqual(new, old, false) }); err != nil { klog.Errorf("fe controller sync statefulset name=%s, namespace=%s, clusterName=%s failed. message=%s.", st.Name, st.Namespace, cluster.Name, err.Error()) return err } - return nil -} -func min(a, b int32) int32 { - if a < b { - return a - } - return b + return nil } diff --git a/pkg/controller/sub_controller/fe/prepare_modify.go b/pkg/controller/sub_controller/fe/prepare_modify.go new file mode 100644 index 00000000..42e4ca5d --- /dev/null +++ b/pkg/controller/sub_controller/fe/prepare_modify.go @@ -0,0 +1,196 @@ +package fe + +import ( + "context" + v1 "github.com/selectdb/doris-operator/api/doris/v1" + "github.com/selectdb/doris-operator/pkg/common/utils/k8s" + "github.com/selectdb/doris-operator/pkg/common/utils/mysql" + "github.com/selectdb/doris-operator/pkg/common/utils/resource" + appv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sort" + "strconv" + "strings" +) + +// prepareStatefulsetApply means Pre-operation and status control on the client side +func (fc *Controller) prepareStatefulsetApply(ctx context.Context, cluster *v1.DorisCluster) error { + var oldSt appv1.StatefulSet + err := fc.K8sclient.Get(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: v1.GenerateComponentStatefulSetName(cluster, v1.Component_FE)}, &oldSt) + if err != nil { + klog.Infof("fe controller controlClusterPhaseAndPreOperation get fe StatefulSet failed, err: %s", err.Error()) + return nil + } + scaleNumber := *(cluster.Spec.FeSpec.Replicas) - *(oldSt.Spec.Replicas) + // fe scale + if scaleNumber != 0 { // set fe Phase as SCALING + cluster.Status.FEStatus.ComponentCondition.Phase = v1.Scaling + // In Reconcile, it is possible that the status cannot be updated in time, + // resulting in an error in the status judgment based on the last status, + // so the status will be forced to modify here + if err := k8s.SetDorisClusterPhase(ctx, fc.K8sclient, cluster.Name, cluster.Namespace, v1.Scaling, v1.Component_FE); err != nil { + klog.Errorf("SetDorisClusterPhase 'SCALING' failed err:%s ", err.Error()) + return err + } + } + if scaleNumber < 0 { + if err := fc.dropObserverFromSqlClient(ctx, fc.K8sclient, cluster); err != nil { + klog.Errorf("ScaleDownObserver failed, err:%s ", err.Error()) + return err + } + return nil + } + + //TODO check upgrade ,restart + + return nil +} + +// dropObserverFromSqlClient handles doris'SQL(drop frontend) through the MySQL client when dealing with scale in observer +// targetDCR is new dcr +// scaleNumber is the number of Observer needing scale down +func (fc *Controller) dropObserverFromSqlClient(ctx context.Context, k8sclient client.Client, targetDCR *v1.DorisCluster) error { + // get adminuserName and pwd + secret, _ := k8s.GetSecret(ctx, k8sclient, targetDCR.Namespace, targetDCR.Spec.AuthSecret) + adminUserName, password := v1.GetClusterSecret(targetDCR, secret) + // get host and port + serviceName := v1.GenerateExternalServiceName(targetDCR, v1.Component_FE) + maps, _ := k8s.GetConfig(ctx, k8sclient, &targetDCR.Spec.FeSpec.ConfigMapInfo, targetDCR.Namespace, v1.Component_FE) + queryPort := resource.GetPort(maps, resource.QUERY_PORT) + + // connect to doris sql to get master node + // It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail. + dbConf := mysql.DBConfig{ + User: adminUserName, + Password: password, + Host: serviceName, + Port: strconv.FormatInt(int64(queryPort), 10), + Database: "mysql", + } + loadBalanceDBClient, err := mysql.NewDorisSqlDB(dbConf) + if err != nil { + klog.Errorf("DropObserverFromSqlClient failed, get fe node connection err:%s", err.Error()) + return err + } + defer loadBalanceDBClient.Close() + master, _, err := loadBalanceDBClient.GetFollowers() + if err != nil { + klog.Errorf("DropObserverFromSqlClient GetFollowers master failed, err:%s", err.Error()) + return err + } + var masterDBClient *mysql.DB + if master.CurrentConnected == "Yes" { + masterDBClient = loadBalanceDBClient + } else { + // Get the connection to the master + masterDBClient, err = mysql.NewDorisSqlDB(mysql.DBConfig{ + User: adminUserName, + Password: password, + Host: master.Host, + Port: strconv.FormatInt(int64(queryPort), 10), + Database: "mysql", + }) + if err != nil { + klog.Errorf("DropObserverFromSqlClient failed, get fe master connection err:%s", err.Error()) + return err + } + defer masterDBClient.Close() + } + + // get all Observes + allObserves, err := masterDBClient.GetObservers() + if err != nil { + klog.Errorf("DropObserverFromSqlClient failed, GetObservers err:%s", err.Error()) + return err + } + + // make sure real sclaeNumber, this may involve retrying tasks and scaling down followers. + realSclaeNumber := int32(len(allObserves)) - *(targetDCR.Spec.FeSpec.Replicas) + *(targetDCR.Spec.FeSpec.ElectionNumber) + if realSclaeNumber <= 0 { + klog.Errorf("DropObserverFromSqlClient failed, Observers number(%d) is not larger than scale number(%d) ", len(allObserves), *(targetDCR.Spec.FeSpec.Replicas)-*(targetDCR.Spec.FeSpec.ElectionNumber)) + return nil + } + + // get scale Observes + var frontendMap map[int]*mysql.Frontend // frontendMap key is fe pod index ,value is frontend + podTemplateName := resource.GeneratePodTemplateName(targetDCR, v1.Component_FE) + + if resource.GetStartMode(maps) == resource.START_MODEL_FQDN { // use host + frontendMap, err = buildSeqNumberToFrontend(allObserves, nil, podTemplateName) + if err != nil { + klog.Errorf("DropObserverFromSqlClient failed, buildSeqNumberToFrontend err:%s", err.Error()) + return nil + } + } else { // use ip + podMap := make(map[string]string) // key is pod ip, value is pod name + pods, err := k8s.GetPods(ctx, k8sclient, *targetDCR, v1.Component_FE) + if err != nil { + klog.Errorf("DropObserverFromSqlClient failed, GetPods err:%s", err) + return nil + } + for _, item := range pods.Items { + if strings.HasPrefix(item.GetName(), podTemplateName) { + podMap[item.Status.PodIP] = item.GetName() + } + } + frontendMap, err = buildSeqNumberToFrontend(allObserves, podMap, podTemplateName) + if err != nil { + klog.Errorf("DropObserverFromSqlClient failed, buildSeqNumberToFrontend err:%s", err.Error()) + return nil + } + } + observes := getFirstFewFrontendsAfterDescendOrder(frontendMap, realSclaeNumber) + // drop node and return + return masterDBClient.DropObserver(observes) + +} + +// buildSeqNumberToFrontend +// input ipMap key is podIP,value is fe.podName(from 'kubectl get pods -owide') +// return frontendMap key is fe pod index ,value is frontend +func buildSeqNumberToFrontend(frontends []*mysql.Frontend, ipMap map[string]string, podTemplateName string) (map[int]*mysql.Frontend, error) { + frontendMap := make(map[int]*mysql.Frontend) + for _, fe := range frontends { + var podSignName string + if strings.HasPrefix(fe.Host, podTemplateName) { + // use fqdn, not need ipMap + // podSignName like: doriscluster-sample-fe-0.doriscluster-sample-fe-internal.doris.svc.cluster.local + podSignName = fe.Host + } else { + // use ip + // podSignName like: doriscluster-sample-fe-0 + podSignName = ipMap[fe.Host] + } + split := strings.Split(strings.Split(strings.Split(podSignName, podTemplateName)[1], ".")[0], "-") + num, err := strconv.Atoi(split[len(split)-1]) + if err != nil { + klog.Errorf("buildSeqNumberToFrontend can not split pod name,pod name: %s,err:%s", podSignName, err.Error()) + return nil, err + } + frontendMap[num] = fe + } + return frontendMap, nil +} + +// GetFirstFewFrontendsAfterDescendOrder means descending sort fe by index and return top scaleNumber +func getFirstFewFrontendsAfterDescendOrder(frontendMap map[int]*mysql.Frontend, scaleNumber int32) []*mysql.Frontend { + var topFrontends []*mysql.Frontend + if int(scaleNumber) <= len(frontendMap) { + keys := make([]int, 0, len(frontendMap)) + for k := range frontendMap { + keys = append(keys, k) + } + sort.Slice(keys, func(i, j int) bool { + return keys[i] > keys[j] + }) + + for i := 0; i < int(scaleNumber); i++ { + topFrontends = append(topFrontends, frontendMap[keys[i]]) + } + } else { + klog.Errorf("getFirstFewFrontendsAfterDescendOrder frontendMap size(%d) not larger than scaleNumber(%d)", len(frontendMap), scaleNumber) + } + return topFrontends +} diff --git a/pkg/controller/sub_controller/sub_controller.go b/pkg/controller/sub_controller/sub_controller.go index 82a9dc70..62115916 100644 --- a/pkg/controller/sub_controller/sub_controller.go +++ b/pkg/controller/sub_controller/sub_controller.go @@ -4,18 +4,20 @@ import ( "context" "fmt" dorisv1 "github.com/selectdb/doris-operator/api/doris/v1" - "github.com/selectdb/doris-operator/pkg/common/utils" + utils "github.com/selectdb/doris-operator/pkg/common/utils" "github.com/selectdb/doris-operator/pkg/common/utils/k8s" "github.com/selectdb/doris-operator/pkg/common/utils/resource" appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "strconv" "strings" + "time" ) type SubController interface { @@ -62,7 +64,6 @@ func (d *SubDefaultController) ClassifyPodsByStatus(namespace string, status *do } } - status.ComponentCondition.Phase = dorisv1.Reconciling if len(readys) == int(replicas) { status.ComponentCondition.Phase = dorisv1.Available } else if len(faileds) != 0 { @@ -81,16 +82,11 @@ func (d *SubDefaultController) ClassifyPodsByStatus(namespace string, status *do } func (d *SubDefaultController) GetConfig(ctx context.Context, configMapInfo *dorisv1.ConfigMapInfo, namespace string, componentType dorisv1.ComponentType) (map[string]interface{}, error) { - cms := resource.GetMountConfigMapInfo(*configMapInfo) - if len(cms) == 0 { - return make(map[string]interface{}), nil - } - configMaps, err := k8s.GetConfigMaps(ctx, d.K8sclient, namespace, cms) + config, err := k8s.GetConfig(ctx, d.K8sclient, configMapInfo, namespace, componentType) if err != nil { klog.Errorf("SubDefaultController GetConfig get configmap failed, namespace: %s,err: %s \n", namespace, err.Error()) } - res, resolveErr := resource.ResolveConfigMaps(configMaps, componentType) - return res, utils.MergeError(err, resolveErr) + return config, nil } // generate map for mountpath:configmap @@ -327,3 +323,147 @@ func (d *SubDefaultController) patchPVCs(ctx context.Context, dcr *dorisv1.Doris return prepared } + +// RecycleResources pvc resource for recycle +func (d *SubDefaultController) RecycleResources(ctx context.Context, dcr *dorisv1.DorisCluster, componentType dorisv1.ComponentType) error { + switch componentType { + case dorisv1.Component_FE: + return d.recycleFEResources(ctx, dcr) + default: + klog.Infof("RecycleResources not support type=%s", componentType) + return nil + } +} + +// recycleFEResources pvc resource for fe recycle +func (d *SubDefaultController) recycleFEResources(ctx context.Context, dcr *dorisv1.DorisCluster) error { + if len(dcr.Spec.FeSpec.PersistentVolumes) != 0 { + return d.listAndDeletePersistentVolumeClaim(ctx, dcr, dorisv1.Component_FE) + } + return nil +} + +// listAndDeletePersistentVolumeClaim: +// 1. list pvcs by statefulset selector labels . +// 2. get pvcs by dorisv1.PersistentVolume.name +// 2.1 travel pvcs, use key="-^"+volume.name, value=pvc put into map. starting with "-^" as the k8s resource name not allowed start with it. +// 3. delete pvc +func (d *SubDefaultController) listAndDeletePersistentVolumeClaim(ctx context.Context, dcr *dorisv1.DorisCluster, componentType dorisv1.ComponentType) error { + var volumes []dorisv1.PersistentVolume + var replicas int32 + switch componentType { + case dorisv1.Component_FE: + volumes = dcr.Spec.FeSpec.PersistentVolumes + replicas = *dcr.Spec.FeSpec.Replicas + case dorisv1.Component_BE: + volumes = dcr.Spec.BeSpec.PersistentVolumes + replicas = *dcr.Spec.BeSpec.Replicas + case dorisv1.Component_CN: + volumes = dcr.Spec.CnSpec.PersistentVolumes + replicas = *dcr.Spec.CnSpec.Replicas + default: + } + + pvcList := corev1.PersistentVolumeClaimList{} + selector := dorisv1.GenerateStatefulSetSelector(dcr, componentType) + stsName := dorisv1.GenerateComponentStatefulSetName(dcr, componentType) + if err := d.K8sclient.List(ctx, &pvcList, client.InNamespace(dcr.Namespace), client.MatchingLabels(selector)); err != nil { + d.K8srecorder.Event(dcr, EventWarning, PVCListFailed, string("list component "+componentType+" failed!")) + return err + } + //classify pvc by volume.Name, pvc.name generate by volume.Name + statefulset.Name + ordinal + pvcMap := make(map[string][]corev1.PersistentVolumeClaim) + + for _, pvc := range pvcList.Items { + //start with unique string for classify pvc, avoid empty string match all pvc.Name + key := "-^" + for _, volume := range volumes { + if volume.Name != "" && strings.HasPrefix(pvc.Name, volume.Name) { + key = key + volume.Name + break + } + } + + if _, ok := pvcMap[key]; !ok { + pvcMap[key] = []corev1.PersistentVolumeClaim{} + } + pvcMap[key] = append(pvcMap[key], pvc) + } + + var mergeError error + for _, volume := range volumes { + // Clean up the existing PVC that is larger than expected + claims := pvcMap["-^"+volume.Name] + if len(claims) <= int(replicas) { + continue + } + if err := d.deletePVCs(ctx, dcr, selector, len(claims), stsName, volume.Name, replicas); err != nil { + mergeError = utils.MergeError(mergeError, err) + } + } + return mergeError +} + +// deletePVCs will Loop to remove excess pvc +func (d *SubDefaultController) deletePVCs(ctx context.Context, dcr *dorisv1.DorisCluster, selector map[string]string, + pvcSize int, stsName, volumeName string, replicas int32) error { + maxOrdinal := pvcSize + + var mergeError error + for ; maxOrdinal > int(replicas); maxOrdinal-- { + pvcName := resource.BuildPVCName(stsName, strconv.Itoa(maxOrdinal-1), volumeName) + if err := k8s.DeletePVC(ctx, d.K8sclient, dcr.Namespace, pvcName, selector); err != nil { + d.K8srecorder.Event(dcr, EventWarning, PVCDeleteFailed, err.Error()) + klog.Errorf("SubController namespace %s name %s delete pvc %s failed, %s.", dcr.Namespace, dcr.Name, pvcName) + mergeError = utils.MergeError(mergeError, err) + } + } + return mergeError +} + +func (d *SubDefaultController) InitStatus(dcr *dorisv1.DorisCluster, componentType dorisv1.ComponentType) { + switch componentType { + case dorisv1.Component_FE: + d.initFEStatus(dcr) + case dorisv1.Component_BE: + d.initBEStatus(dcr) + default: + klog.Infof("InitStatus not support type=", componentType) + } +} + +func (d *SubDefaultController) initFEStatus(cluster *dorisv1.DorisCluster) { + initPhase := dorisv1.Initializing + // When in the Change phase, the state should inherit the last state instead of using the default state. Prevent incorrect Initializing of the change state + if cluster.Status.FEStatus != nil && dorisv1.IsReconcilingStatusPhase(cluster.Status.FEStatus) { + initPhase = cluster.Status.FEStatus.ComponentCondition.Phase + } + + status := &dorisv1.ComponentStatus{ + ComponentCondition: dorisv1.ComponentCondition{ + SubResourceName: dorisv1.GenerateComponentStatefulSetName(cluster, dorisv1.Component_FE), + Phase: initPhase, + LastTransitionTime: metav1.NewTime(time.Now()), + }, + } + status.AccessService = dorisv1.GenerateExternalServiceName(cluster, dorisv1.Component_FE) + cluster.Status.FEStatus = status +} + +func (d *SubDefaultController) initBEStatus(cluster *dorisv1.DorisCluster) { + initPhase := dorisv1.Initializing + // When in the Change phase, the state should inherit the last state instead of using the default state. Prevent incorrect Initializing of the change state + if cluster.Status.BEStatus != nil && dorisv1.IsReconcilingStatusPhase(cluster.Status.BEStatus) { + initPhase = cluster.Status.BEStatus.ComponentCondition.Phase + } + + status := &dorisv1.ComponentStatus{ + ComponentCondition: dorisv1.ComponentCondition{ + SubResourceName: dorisv1.GenerateComponentStatefulSetName(cluster, dorisv1.Component_BE), + Phase: initPhase, + LastTransitionTime: metav1.NewTime(time.Now()), + }, + } + status.AccessService = dorisv1.GenerateExternalServiceName(cluster, dorisv1.Component_BE) + cluster.Status.BEStatus = status +}