diff --git a/pkg/cmd/cli.go b/pkg/cmd/cli.go index 6c8cc5936..0d6865617 100644 --- a/pkg/cmd/cli.go +++ b/pkg/cmd/cli.go @@ -49,7 +49,6 @@ import ( "github.com/apecloud/kbcli/pkg/cmd/dataprotection" "github.com/apecloud/kbcli/pkg/cmd/fault" "github.com/apecloud/kbcli/pkg/cmd/kubeblocks" - "github.com/apecloud/kbcli/pkg/cmd/migration" "github.com/apecloud/kbcli/pkg/cmd/options" "github.com/apecloud/kbcli/pkg/cmd/playground" "github.com/apecloud/kbcli/pkg/cmd/plugin" @@ -179,7 +178,6 @@ A Command Line Interface for KubeBlocks`, clusterdefinition.NewClusterDefinitionCmd(f, ioStreams), alert.NewAlertCmd(f, ioStreams), addon.NewAddonCmd(f, ioStreams), - migration.NewMigrationCmd(f, ioStreams), plugin.NewPluginCmd(ioStreams), fault.NewFaultCmd(f, ioStreams), builder.NewBuilderCmd(f, ioStreams), diff --git a/pkg/cmd/migration/base.go b/pkg/cmd/migration/base.go deleted file mode 100644 index e5f04f4e0..000000000 --- a/pkg/cmd/migration/base.go +++ /dev/null @@ -1,261 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - "context" - "fmt" - "os" - "strings" - - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/dynamic" - cmdutil "k8s.io/kubectl/pkg/cmd/util" - - "github.com/apecloud/kbcli/pkg/types" - migrationv1 "github.com/apecloud/kbcli/pkg/types/migrationapi" - "github.com/apecloud/kbcli/pkg/util" -) - -const ( - MigrationTaskLabel = "datamigration.apecloud.io/migrationtask" - MigrationTaskStepAnnotation = "datamigration.apecloud.io/step" - SerialJobOrderAnnotation = "common.apecloud.io/serial_job_order" -) - -const ( - invalidMigrationCrdAdvice = "to use migration-related functions, please ensure that the addon of migration is enabled, use: 'kbcli addon enable migration' to enable the addon" -) - -// Endpoint -// Todo: For the source or target is cluster in KubeBlocks. A better way is to get secret from {$clustername}-conn-credential, so the username, password, addresses can be omitted - -type EndpointModel struct { - UserName string `json:"userName"` - Password string `json:"password"` - Address string `json:"address"` - // +optional - Database string `json:"databaseName,omitempty"` -} - -func (e *EndpointModel) BuildFromStr(msgArr *[]string, endpointStr string) error { - if endpointStr == "" { - BuildErrorMsg(msgArr, "endpoint string cannot be empty") - return nil - } - e.clear() - endpointStr = strings.TrimSpace(endpointStr) - accountURLPair := strings.Split(endpointStr, "@") - if len(accountURLPair) != 2 { - BuildErrorMsg(msgArr, "endpoint may not contain account info") - return nil - } - accountPair := strings.Split(accountURLPair[0], ":") - if len(accountPair) != 2 { - BuildErrorMsg(msgArr, "the account info in endpoint is invalid, should be like \"user:123456\"") - return nil - } - e.UserName = accountPair[0] - e.Password = accountPair[1] - if strings.LastIndex(accountURLPair[1], "/") != -1 { - addressDatabasePair := strings.Split(accountURLPair[1], "/") - e.Address = strings.Join(addressDatabasePair[:len(addressDatabasePair)-1], "/") - e.Database = addressDatabasePair[len(addressDatabasePair)-1] - } else { - e.Address = accountURLPair[1] - } - return nil -} - -func (e *EndpointModel) clear() { - e.Address = "" - e.Password = "" - e.UserName = "" - e.Database = "" -} - -// Migration Object - -type MigrationObjectModel struct { - WhiteList []DBObjectExpress `json:"whiteList"` -} - -type DBObjectExpress struct { - SchemaName string `json:"schemaName"` - // +optional - IsAll bool `json:"isAll"` - // +optional - TableList []TableObjectExpress `json:"tableList"` -} - -type TableObjectExpress struct { - TableName string `json:"tableName"` - // +optional - IsAll bool `json:"isAll"` -} - -func (m *MigrationObjectModel) BuildFromStrs(errMsgArr *[]string, objStrs []string) error { - if len(objStrs) == 0 { - BuildErrorMsg(errMsgArr, "migration object cannot be empty") - return nil - } - for _, str := range objStrs { - msg := "" - if str == "" { - msg = "the database or database.table in migration object cannot be empty" - } - dbTablePair := strings.Split(str, ".") - if len(dbTablePair) > 2 { - msg = fmt.Sprintf("[%s] is not a valid database or database.table", str) - } - if msg != "" { - BuildErrorMsg(errMsgArr, msg) - return nil - } - if len(dbTablePair) == 1 { - m.WhiteList = append(m.WhiteList, DBObjectExpress{ - SchemaName: str, - IsAll: true, - }) - } else { - dbObjPoint, err := m.ContainSchema(dbTablePair[0]) - if err != nil { - return err - } - if dbObjPoint != nil { - dbObjPoint.TableList = append(dbObjPoint.TableList, TableObjectExpress{ - TableName: dbTablePair[1], - IsAll: true, - }) - } else { - m.WhiteList = append(m.WhiteList, DBObjectExpress{ - SchemaName: dbTablePair[0], - TableList: []TableObjectExpress{{ - TableName: dbTablePair[1], - IsAll: true, - }}, - }) - } - } - } - return nil -} - -func (m *MigrationObjectModel) ContainSchema(schemaName string) (*DBObjectExpress, error) { - for i := 0; i < len(m.WhiteList); i++ { - if m.WhiteList[i].SchemaName == schemaName { - return &m.WhiteList[i], nil - } - } - return nil, nil -} - -func CliStepChangeToStructure() (map[string]string, []string) { - validStepMap := map[string]string{ - migrationv1.CliStepPreCheck.String(): migrationv1.CliStepPreCheck.String(), - migrationv1.CliStepInitStruct.String(): migrationv1.CliStepInitStruct.String(), - migrationv1.CliStepInitData.String(): migrationv1.CliStepInitData.String(), - migrationv1.CliStepCdc.String(): migrationv1.CliStepCdc.String(), - } - validStepKey := make([]string, 0) - for k := range validStepMap { - validStepKey = append(validStepKey, k) - } - return validStepMap, validStepKey -} - -type TaskTypeEnum string - -const ( - Initialization TaskTypeEnum = "initialization" - InitializationAndCdc TaskTypeEnum = "initialization-and-cdc" // default value -) - -func (s TaskTypeEnum) String() string { - return string(s) -} - -func IsMigrationCrdValidWithDynamic(dynamic *dynamic.Interface) (bool, error) { - resource := types.CustomResourceDefinitionGVR() - if err := APIResource(dynamic, &resource, "migrationtasks.datamigration.apecloud.io", "", nil); err != nil { - return false, err - } - if err := APIResource(dynamic, &resource, "migrationtemplates.datamigration.apecloud.io", "", nil); err != nil { - return false, err - } - if err := APIResource(dynamic, &resource, "serialjobs.common.apecloud.io", "", nil); err != nil { - return false, err - } - return true, nil -} - -func PrintCrdInvalidError(err error) { - if err == nil { - return - } - if !errors.IsNotFound(err) { - util.CheckErr(err) - } - fmt.Fprintf(os.Stderr, "hint: %s\n", invalidMigrationCrdAdvice) - os.Exit(cmdutil.DefaultErrorExitCode) -} - -func IsMigrationCrdValidWithFactory(factory cmdutil.Factory) (bool, error) { - dynamic, err := factory.DynamicClient() - if err != nil { - return false, err - } - return IsMigrationCrdValidWithDynamic(&dynamic) -} - -func APIResource(dynamic *dynamic.Interface, resource *schema.GroupVersionResource, name string, namespace string, res interface{}) error { - obj, err := (*dynamic).Resource(*resource).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}, "") - if err != nil { - return err - } - if res != nil { - return runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, res) - } - return nil -} - -func BuildErrorMsg(msgArr *[]string, msg string) { - if *msgArr == nil { - *msgArr = make([]string, 1) - } - *msgArr = append(*msgArr, msg) -} - -func BuildInitializationStepsOrder(task *migrationv1.MigrationTask, template *migrationv1.MigrationTemplate) []string { - stepMap := make(map[string]string) - for _, taskStep := range task.Spec.Initialization.Steps { - stepMap[taskStep.String()] = taskStep.String() - } - resultArr := make([]string, 0) - for _, stepModel := range template.Spec.Initialization.Steps { - if stepMap[stepModel.Step.String()] != "" { - resultArr = append(resultArr, stepModel.Step.CliString()) - } - } - return resultArr -} diff --git a/pkg/cmd/migration/base_test.go b/pkg/cmd/migration/base_test.go deleted file mode 100644 index 5cc0b172f..000000000 --- a/pkg/cmd/migration/base_test.go +++ /dev/null @@ -1,67 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - v1alpha1 "github.com/apecloud/kbcli/pkg/types/migrationapi" -) - -var _ = Describe("base", func() { - - Context("Basic function validate", func() { - - It("CliStepChangeToStructure", func() { - resultMap, resultKeyArr := CliStepChangeToStructure() - Expect(len(resultMap)).Should(Equal(4)) - Expect(len(resultKeyArr)).Should(Equal(4)) - }) - - It("BuildInitializationStepsOrder", func() { - task := &v1alpha1.MigrationTask{ - Spec: v1alpha1.MigrationTaskSpec{ - Initialization: v1alpha1.InitializationConfig{ - Steps: []v1alpha1.StepEnum{ - v1alpha1.StepFullLoad, - v1alpha1.StepStructPreFullLoad, - }, - }, - }, - } - template := &v1alpha1.MigrationTemplate{ - Spec: v1alpha1.MigrationTemplateSpec{ - Initialization: v1alpha1.InitializationModel{ - Steps: []v1alpha1.StepModel{ - {Step: v1alpha1.StepStructPreFullLoad}, - {Step: v1alpha1.StepFullLoad}, - }, - }, - }, - } - arr := BuildInitializationStepsOrder(task, template) - Expect(len(arr)).Should(Equal(2)) - Expect(arr[0]).Should(Equal(v1alpha1.StepStructPreFullLoad.CliString())) - Expect(arr[1]).Should(Equal(v1alpha1.StepFullLoad.CliString())) - }) - }) - -}) diff --git a/pkg/cmd/migration/cmd_builder.go b/pkg/cmd/migration/cmd_builder.go deleted file mode 100644 index 56e947c9e..000000000 --- a/pkg/cmd/migration/cmd_builder.go +++ /dev/null @@ -1,60 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - "github.com/spf13/cobra" - "k8s.io/cli-runtime/pkg/genericiooptions" - cmdutil "k8s.io/kubectl/pkg/cmd/util" - "k8s.io/kubectl/pkg/util/templates" -) - -// NewMigrationCmd creates the cluster command -func NewMigrationCmd(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command { - cmd := &cobra.Command{ - Use: "migration", - Short: "Data migration between two data sources.", - } - - groups := templates.CommandGroups{ - { - Message: "Basic Migration Commands:", - Commands: []*cobra.Command{ - NewMigrationCreateCmd(f, streams), - NewMigrationTemplatesCmd(f, streams), - NewMigrationListCmd(f, streams), - NewMigrationTerminateCmd(f, streams), - }, - }, - { - Message: "Migration Operation Commands:", - Commands: []*cobra.Command{ - NewMigrationDescribeCmd(f, streams), - NewMigrationLogsCmd(f, streams), - }, - }, - } - - // add subcommands - groups.Add(cmd) - templates.ActsAsRootCommand(cmd, nil, groups...) - - return cmd -} diff --git a/pkg/cmd/migration/cmd_builder_test.go b/pkg/cmd/migration/cmd_builder_test.go deleted file mode 100644 index ccb35767d..000000000 --- a/pkg/cmd/migration/cmd_builder_test.go +++ /dev/null @@ -1,41 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "k8s.io/cli-runtime/pkg/genericiooptions" - cmdtesting "k8s.io/kubectl/pkg/cmd/testing" -) - -var _ = Describe("cmd_builder", func() { - var ( - streams genericiooptions.IOStreams - tf *cmdtesting.TestFactory - ) - - It("command build", func() { - cmd := NewMigrationCmd(tf, streams) - Expect(cmd).ShouldNot(BeNil()) - }) - -}) diff --git a/pkg/cmd/migration/create.go b/pkg/cmd/migration/create.go deleted file mode 100644 index 11ed84a6b..000000000 --- a/pkg/cmd/migration/create.go +++ /dev/null @@ -1,293 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - "fmt" - "strings" - "time" - - "github.com/spf13/cobra" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/rand" - "k8s.io/cli-runtime/pkg/genericiooptions" - cmdutil "k8s.io/kubectl/pkg/cmd/util" - - "github.com/apecloud/kbcli/pkg/action" - "github.com/apecloud/kbcli/pkg/types" - migrationv1 "github.com/apecloud/kbcli/pkg/types/migrationapi" - "github.com/apecloud/kbcli/pkg/util" -) - -var ( - AllStepsArr = []string{ - migrationv1.CliStepGlobal.String(), - migrationv1.CliStepPreCheck.String(), - migrationv1.CliStepCdc.String(), - migrationv1.CliStepInitStruct.String(), - migrationv1.CliStepInitData.String(), - } -) - -const ( - StringBoolTrue = "true" - StringBoolFalse = "false" -) - -type CreateMigrationOptions struct { - Template string `json:"template"` - TaskType string `json:"taskType,omitempty"` - Source string `json:"source"` - SourceEndpointModel EndpointModel `json:"sourceEndpointModel,omitempty"` - Sink string `json:"sink"` - SinkEndpointModel EndpointModel `json:"sinkEndpointModel,omitempty"` - MigrationObject []string `json:"migrationObject"` - MigrationObjectModel MigrationObjectModel `json:"migrationObjectModel,omitempty"` - Steps []string `json:"steps,omitempty"` - StepsModel []string `json:"stepsModel,omitempty"` - Tolerations []string `json:"tolerations,omitempty"` - TolerationModel map[string][]interface{} `json:"tolerationModel,omitempty"` - Resources []string `json:"resources,omitempty"` - ResourceModel map[string]interface{} `json:"resourceModel,omitempty"` - ServerID uint32 `json:"serverId,omitempty"` - action.CreateOptions `json:"-"` -} - -func NewMigrationCreateCmd(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command { - o := &CreateMigrationOptions{ - CreateOptions: action.CreateOptions{ - Factory: f, - IOStreams: streams, - CueTemplateName: "migration_template.cue", - GVR: types.MigrationTaskGVR(), - }} - o.CreateOptions.Options = o - - cmd := &cobra.Command{ - Use: "create NAME", - Short: "Create a migration task.", - Example: CreateTemplate, - ValidArgsFunction: util.ResourceNameCompletionFunc(f, types.MigrationTaskGVR()), - Run: func(cmd *cobra.Command, args []string) { - o.Args = args - cmdutil.CheckErr(o.Complete()) - cmdutil.CheckErr(o.Validate()) - cmdutil.CheckErr(o.Run()) - }, - } - - cmd.Flags().StringVar(&o.Template, "template", "", "Specify migration template, run \"kbcli migration templates\" to show all available migration templates") - cmd.Flags().StringVar(&o.Source, "source", "", "Set the source database information for migration.such as '{username}:{password}@{connection_address}:{connection_port}/[{database}]'") - cmd.Flags().StringVar(&o.Sink, "sink", "", "Set the sink database information for migration.such as '{username}:{password}@{connection_address}:{connection_port}/[{database}]") - cmd.Flags().StringSliceVar(&o.MigrationObject, "migration-object", []string{}, "Set the data objects that need to be migrated,such as '\"db1.table1\",\"db2\"'") - cmd.Flags().StringSliceVar(&o.Steps, "steps", []string{}, "Set up migration steps,such as: precheck=true,init-struct=true,init-data=true,cdc=true") - cmd.Flags().StringSliceVar(&o.Tolerations, "tolerations", []string{}, "Tolerations for migration, such as '\"key=engineType,value=pg,operator=Equal,effect=NoSchedule\"'") - cmd.Flags().StringSliceVar(&o.Resources, "resources", []string{}, "Resources limit for migration, such as '\"cpu=3000m,memory=3Gi\"'") - - util.CheckErr(cmd.MarkFlagRequired("template")) - util.CheckErr(cmd.MarkFlagRequired("source")) - util.CheckErr(cmd.MarkFlagRequired("sink")) - util.CheckErr(cmd.MarkFlagRequired("migration-object")) - return cmd -} - -func (o *CreateMigrationOptions) Validate() error { - var err error - - if _, err = IsMigrationCrdValidWithDynamic(&o.Dynamic); err != nil { - PrintCrdInvalidError(err) - } - - if o.Template == "" { - return fmt.Errorf("migration template is needed, use \"kbcli migration templates\" to check and special one") - } - - errMsgArr := make([]string, 0) - // Source - o.SourceEndpointModel = EndpointModel{} - if err = o.SourceEndpointModel.BuildFromStr(&errMsgArr, o.Source); err != nil { - return err - } - // Sink - o.SinkEndpointModel = EndpointModel{} - if err = o.SinkEndpointModel.BuildFromStr(&errMsgArr, o.Sink); err != nil { - return err - } - - // MigrationObject - if err = o.MigrationObjectModel.BuildFromStrs(&errMsgArr, o.MigrationObject); err != nil { - return err - } - - // Steps & taskType - if err = o.BuildWithSteps(&errMsgArr); err != nil { - return err - } - - // Tolerations - if err = o.BuildWithTolerations(); err != nil { - return err - } - - // Resources - if err = o.BuildWithResources(); err != nil { - return err - } - - // RuntimeParams - if err = o.BuildWithRuntimeParams(); err != nil { - return err - } - - // Log errors if necessary - if len(errMsgArr) > 0 { - return fmt.Errorf(strings.Join(errMsgArr, ";\n")) - } - return nil -} - -func (o *CreateMigrationOptions) BuildWithSteps(errMsgArr *[]string) error { - taskType := InitializationAndCdc.String() - validStepMap, validStepKey := CliStepChangeToStructure() - enableCdc, enablePreCheck, enableInitStruct, enableInitData := StringBoolTrue, StringBoolTrue, StringBoolTrue, StringBoolTrue - if len(o.Steps) > 0 { - for _, step := range o.Steps { - stepArr := strings.Split(step, "=") - if len(stepArr) != 2 { - BuildErrorMsg(errMsgArr, fmt.Sprintf("[%s] in steps setting is invalid", step)) - return nil - } - stepName := strings.ToLower(strings.TrimSpace(stepArr[0])) - enable := strings.ToLower(strings.TrimSpace(stepArr[1])) - if validStepMap[stepName] == "" { - BuildErrorMsg(errMsgArr, fmt.Sprintf("[%s] in steps settings is invalid, the name should be one of: (%s)", step, validStepKey)) - return nil - } - if enable != StringBoolTrue && enable != StringBoolFalse { - BuildErrorMsg(errMsgArr, fmt.Sprintf("[%s] in steps settings is invalid, the value should be one of: (true false)", step)) - return nil - } - switch stepName { - case migrationv1.CliStepCdc.String(): - enableCdc = enable - case migrationv1.CliStepPreCheck.String(): - enablePreCheck = enable - case migrationv1.CliStepInitStruct.String(): - enableInitStruct = enable - case migrationv1.CliStepInitData.String(): - enableInitData = enable - } - } - - if enableInitData != StringBoolTrue { - BuildErrorMsg(errMsgArr, "step init-data is needed") - return nil - } - if enableCdc == StringBoolTrue { - taskType = InitializationAndCdc.String() - } else { - taskType = Initialization.String() - } - } - o.TaskType = taskType - o.StepsModel = []string{} - if enablePreCheck == StringBoolTrue { - o.StepsModel = append(o.StepsModel, migrationv1.StepPreCheck.String()) - } - if enableInitStruct == StringBoolTrue { - o.StepsModel = append(o.StepsModel, migrationv1.StepStructPreFullLoad.String()) - } - if enableInitData == StringBoolTrue { - o.StepsModel = append(o.StepsModel, migrationv1.StepFullLoad.String()) - } - return nil -} - -func (o *CreateMigrationOptions) BuildWithTolerations() error { - o.TolerationModel = o.buildTolerationOrResources(o.Tolerations) - tmp := make([]interface{}, 0) - for _, step := range AllStepsArr { - if o.TolerationModel[step] == nil { - o.TolerationModel[step] = tmp - } - } - return nil -} - -func (o *CreateMigrationOptions) BuildWithResources() error { - o.ResourceModel = make(map[string]interface{}) - for k, v := range o.buildTolerationOrResources(o.Resources) { - if len(v) >= 1 { - o.ResourceModel[k] = v[0] - } - } - for _, step := range AllStepsArr { - if o.ResourceModel[step] == nil { - o.ResourceModel[step] = v1.ResourceList{} - } - } - return nil -} - -func (o *CreateMigrationOptions) BuildWithRuntimeParams() error { - template := migrationv1.MigrationTemplate{} - templateGvr := types.MigrationTemplateGVR() - if err := APIResource(&o.CreateOptions.Dynamic, &templateGvr, o.Template, "", &template); err != nil { - return err - } - - // Generate random serverId for MySQL type database. Possible values are between 10001 and 2^32-10001 - if template.Spec.Source.DBType == migrationv1.MigrationDBTypeMySQL { - o.ServerID = o.generateRandomMySQLServerID() - } else { - o.ServerID = 10001 - } - - return nil -} - -func (o *CreateMigrationOptions) buildTolerationOrResources(raws []string) map[string][]interface{} { - results := make(map[string][]interface{}) - for _, raw := range raws { - step := migrationv1.CliStepGlobal.String() - tmpMap := map[string]interface{}{} - rawLoop: - for _, entries := range strings.Split(raw, ",") { - parts := strings.SplitN(entries, "=", 2) - k := strings.TrimSpace(parts[0]) - v := strings.TrimSpace(parts[1]) - if k == "step" { - switch v { - case migrationv1.CliStepPreCheck.String(), migrationv1.CliStepCdc.String(), migrationv1.CliStepInitStruct.String(), migrationv1.CliStepInitData.String(): - step = v - } - continue rawLoop - } - tmpMap[k] = v - } - results[step] = append(results[step], tmpMap) - } - return results -} - -func (o *CreateMigrationOptions) generateRandomMySQLServerID() uint32 { - rand.Seed(time.Now().UnixNano()) - return uint32(rand.Int63nRange(10001, 1<<32-10001)) -} diff --git a/pkg/cmd/migration/create_test.go b/pkg/cmd/migration/create_test.go deleted file mode 100644 index e4cbfc7dc..000000000 --- a/pkg/cmd/migration/create_test.go +++ /dev/null @@ -1,183 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - "bytes" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "k8s.io/cli-runtime/pkg/genericiooptions" - "k8s.io/client-go/kubernetes/scheme" - cmdTest "k8s.io/kubectl/pkg/cmd/testing" - - app "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" - - "github.com/apecloud/kbcli/pkg/testing" - v1alpha1 "github.com/apecloud/kbcli/pkg/types/migrationapi" -) - -var ( - streams genericiooptions.IOStreams - out *bytes.Buffer - tf *cmdTest.TestFactory -) - -const ( - namespace = "test" -) - -var _ = Describe("create", func() { - o := &CreateMigrationOptions{} - - BeforeEach(func() { - streams, _, out, _ = genericiooptions.NewTestIOStreams() - tf = testing.NewTestFactory(namespace) - - _ = app.AddToScheme(scheme.Scheme) - - tf.Client = tf.UnstructuredClient - }) - - Context("Input params validate", func() { - var err error - errMsgArr := make([]string, 0, 3) - It("Endpoint with database", func() { - o.Source = "user:123456@127.0.0.1:5432/database" - err = o.SourceEndpointModel.BuildFromStr(&errMsgArr, o.Source) - Expect(err).ShouldNot(HaveOccurred()) - Expect(o.SourceEndpointModel.UserName).Should(Equal("user")) - Expect(o.SourceEndpointModel.Password).Should(Equal("123456")) - Expect(o.SourceEndpointModel.Address).Should(Equal("127.0.0.1:5432")) - Expect(o.SourceEndpointModel.Database).Should(Equal("database")) - Expect(len(errMsgArr)).Should(Equal(0)) - - o.Sink = "user:123456127.0.0.1:5432/database" - err = o.SinkEndpointModel.BuildFromStr(&errMsgArr, o.Sink) - Expect(err).ShouldNot(HaveOccurred()) - Expect(len(errMsgArr)).Should(Equal(1)) - }) - - It("Endpoint with no database", func() { - o.Source = "user:123456@127.0.0.1:3306" - errMsgArr := make([]string, 0, 3) - err = o.SourceEndpointModel.BuildFromStr(&errMsgArr, o.Source) - Expect(err).ShouldNot(HaveOccurred()) - Expect(o.SourceEndpointModel.UserName).Should(Equal("user")) - Expect(o.SourceEndpointModel.Password).Should(Equal("123456")) - Expect(o.SourceEndpointModel.Address).Should(Equal("127.0.0.1:3306")) - Expect(o.SourceEndpointModel.Database).Should(BeEmpty()) - Expect(len(errMsgArr)).Should(Equal(0)) - - o.Sink = "user:123456127.0.0.1:3306" - err = o.SinkEndpointModel.BuildFromStr(&errMsgArr, o.Sink) - Expect(err).ShouldNot(HaveOccurred()) - Expect(len(errMsgArr)).Should(Equal(1)) - }) - - It("MigrationObject", func() { - o.MigrationObject = []string{"schema_public.table1", "schema2.table2_1", "schema2.table2_2", "schema3"} - err = o.MigrationObjectModel.BuildFromStrs(&errMsgArr, o.MigrationObject) - Expect(err).ShouldNot(HaveOccurred()) - for _, obj := range o.MigrationObjectModel.WhiteList { - Expect(obj.SchemaName).Should(BeElementOf("schema_public", "schema2", "schema3")) - switch obj.SchemaName { - case "schema_public": - Expect(len(obj.TableList)).Should(Equal(1)) - Expect(obj.TableList[0].TableName).Should(Equal("table1")) - Expect(obj.TableList[0].IsAll).Should(BeTrue()) - case "schema2": - Expect(len(obj.TableList)).Should(Equal(2)) - for _, tb := range obj.TableList { - Expect(tb.TableName).Should(BeElementOf("table2_1", "table2_2")) - Expect(tb.IsAll).Should(BeTrue()) - } - case "schema3": - Expect(obj.IsAll).Should(BeTrue()) - } - } - }) - - It("Steps", func() { - o.Steps = make([]string, 0) - err = o.BuildWithSteps(&errMsgArr) - Expect(err).ShouldNot(HaveOccurred()) - Expect(o.TaskType).Should(Equal(InitializationAndCdc.String())) - Expect(o.StepsModel).Should(ContainElements(v1alpha1.StepPreCheck.String(), v1alpha1.StepStructPreFullLoad.String(), v1alpha1.StepFullLoad.String())) - o.Steps = []string{"precheck=true", "init-struct=false", "cdc=false"} - err = o.BuildWithSteps(&errMsgArr) - Expect(err).ShouldNot(HaveOccurred()) - Expect(o.TaskType).Should(Equal(Initialization.String())) - Expect(o.StepsModel).Should(ContainElements(v1alpha1.StepPreCheck.String(), v1alpha1.StepFullLoad.String())) - }) - - It("Tolerations", func() { - o.Tolerations = []string{ - "step=global,key=engineType,value=pg,operator=Equal,effect=NoSchedule", - "step=init-data,key=engineType,value=pg1,operator=Equal,effect=NoSchedule", - "key=engineType,value=pg2,operator=Equal,effect=NoSchedule", - } - err = o.BuildWithTolerations() - Expect(err).ShouldNot(HaveOccurred()) - Expect(o.TolerationModel[v1alpha1.CliStepGlobal.String()]).ShouldNot(BeEmpty()) - Expect(o.TolerationModel[v1alpha1.CliStepInitData.String()]).ShouldNot(BeEmpty()) - Expect(len(o.TolerationModel[v1alpha1.CliStepInitData.String()])).Should(Equal(1)) - Expect(len(o.TolerationModel[v1alpha1.CliStepGlobal.String()])).Should(Equal(2)) - Expect(len(o.TolerationModel[v1alpha1.CliStepPreCheck.String()])).Should(Equal(0)) - }) - - It("Resources", func() { - o.Resources = []string{ - "step=global,cpu=1000m,memory=1Gi", - "step=init-data,cpu=2000m,memory=2Gi", - "cpu=3000m,memory=3Gi", - } - err = o.BuildWithResources() - Expect(err).ShouldNot(HaveOccurred()) - Expect(o.ResourceModel[v1alpha1.CliStepGlobal.String()]).ShouldNot(BeEmpty()) - Expect(o.ResourceModel[v1alpha1.CliStepInitData.String()]).ShouldNot(BeEmpty()) - Expect(o.ResourceModel[v1alpha1.CliStepPreCheck.String()]).Should(BeEmpty()) - }) - - It("RuntimeParams", func() { - type void struct{} - var setValue void - serverIDSet := make(map[uint32]void) - - loopCount := 0 - for loopCount < 1000 { - newServerID := o.generateRandomMySQLServerID() - Expect(newServerID >= 10001 && newServerID <= 1<<32-10001).Should(BeTrue()) - serverIDSet[newServerID] = setValue - - loopCount += 1 - } - Expect(len(serverIDSet) > 500).Should(BeTrue()) - }) - }) - - Context("Mock run", func() { - It("test", func() { - cmd := NewMigrationCreateCmd(tf, streams) - Expect(cmd).ShouldNot(BeNil()) - }) - }) -}) diff --git a/pkg/cmd/migration/describe.go b/pkg/cmd/migration/describe.go deleted file mode 100644 index 2fabd732f..000000000 --- a/pkg/cmd/migration/describe.go +++ /dev/null @@ -1,304 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - "context" - "fmt" - "io" - "sort" - "strconv" - "strings" - "time" - - "github.com/spf13/cobra" - appv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/cli-runtime/pkg/genericiooptions" - "k8s.io/client-go/dynamic" - clientset "k8s.io/client-go/kubernetes" - cmdutil "k8s.io/kubectl/pkg/cmd/util" - - "github.com/apecloud/kbcli/pkg/printer" - "github.com/apecloud/kbcli/pkg/types" - v1alpha1 "github.com/apecloud/kbcli/pkg/types/migrationapi" - "github.com/apecloud/kbcli/pkg/util" -) - -var ( - newTbl = func(out io.Writer, title string, header ...interface{}) *printer.TablePrinter { - fmt.Fprintln(out, title) - tbl := printer.NewTablePrinter(out) - tbl.SetHeader(header...) - return tbl - } -) - -type describeOptions struct { - factory cmdutil.Factory - client clientset.Interface - dynamic dynamic.Interface - namespace string - - // resource type and names - gvr schema.GroupVersionResource - names []string - - *v1alpha1.MigrationObjects - genericiooptions.IOStreams -} - -func newOptions(f cmdutil.Factory, streams genericiooptions.IOStreams) *describeOptions { - return &describeOptions{ - factory: f, - IOStreams: streams, - gvr: types.MigrationTaskGVR(), - } -} - -func NewMigrationDescribeCmd(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command { - o := newOptions(f, streams) - cmd := &cobra.Command{ - Use: "describe NAME", - Short: "Show details of a specific migration task.", - Example: DescribeExample, - ValidArgsFunction: util.ResourceNameCompletionFunc(f, types.MigrationTaskGVR()), - Run: func(cmd *cobra.Command, args []string) { - util.CheckErr(o.complete(args)) - util.CheckErr(o.run()) - }, - } - return cmd -} - -func (o *describeOptions) complete(args []string) error { - var err error - - if o.client, err = o.factory.KubernetesClientSet(); err != nil { - return err - } - - if o.dynamic, err = o.factory.DynamicClient(); err != nil { - return err - } - - if o.namespace, _, err = o.factory.ToRawKubeConfigLoader().Namespace(); err != nil { - return err - } - - if _, err = IsMigrationCrdValidWithDynamic(&o.dynamic); err != nil { - PrintCrdInvalidError(err) - } - - if len(args) == 0 { - return fmt.Errorf("migration task name should be specified") - } - o.names = args - return nil -} - -func (o *describeOptions) run() error { - for _, name := range o.names { - if err := o.describeMigration(name); err != nil { - return err - } - } - return nil -} - -func (o *describeOptions) describeMigration(name string) error { - var err error - if o.MigrationObjects, err = getMigrationObjects(o, name); err != nil { - return err - } - - // MigrationTask Summary - showTaskSummary(o.Task, o.Out) - - // MigrationTask Config - showTaskConfig(o.Task, o.Out) - - // MigrationTemplate Summary - showTemplateSummary(o.Template, o.Out) - - // Initialization Detail - showInitialization(o.Task, o.Template, o.Jobs, o.Out) - - switch o.Task.Spec.TaskType { - case v1alpha1.InitializationAndCdc, v1alpha1.CDC: - // Cdc Detail - showCdc(o.StatefulSets, o.Pods, o.Out) - - // Cdc Metrics - showCdcMetrics(o.Task, o.Out) - } - - fmt.Fprintln(o.Out) - - return nil -} - -func getMigrationObjects(o *describeOptions, taskName string) (*v1alpha1.MigrationObjects, error) { - obj := &v1alpha1.MigrationObjects{ - Task: &v1alpha1.MigrationTask{}, - Template: &v1alpha1.MigrationTemplate{}, - } - var err error - taskGvr := types.MigrationTaskGVR() - if err = APIResource(&o.dynamic, &taskGvr, taskName, o.namespace, obj.Task); err != nil { - return nil, err - } - templateGvr := types.MigrationTemplateGVR() - if err = APIResource(&o.dynamic, &templateGvr, obj.Task.Spec.Template, "", obj.Template); err != nil { - return nil, err - } - listOpts := func() metav1.ListOptions { - return metav1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", MigrationTaskLabel, taskName), - } - } - if obj.Jobs, err = o.client.BatchV1().Jobs(o.namespace).List(context.Background(), listOpts()); err != nil { - return nil, err - } - if obj.Pods, err = o.client.CoreV1().Pods(o.namespace).List(context.Background(), listOpts()); err != nil { - return nil, err - } - if obj.StatefulSets, err = o.client.AppsV1().StatefulSets(o.namespace).List(context.Background(), listOpts()); err != nil { - return nil, err - } - return obj, nil -} - -func showTaskSummary(task *v1alpha1.MigrationTask, out io.Writer) { - if task == nil { - return - } - title := fmt.Sprintf("Name: %s\t Status: %s", task.Name, task.Status.TaskStatus) - tbl := newTbl(out, title, "NAMESPACE", "CREATED-TIME", "START-TIME", "FINISHED-TIME") - tbl.AddRow(task.Namespace, util.TimeFormatWithDuration(&task.CreationTimestamp, time.Second), util.TimeFormatWithDuration(task.Status.StartTime, time.Second), util.TimeFormatWithDuration(task.Status.FinishTime, time.Second)) - tbl.Print() -} - -func showTaskConfig(task *v1alpha1.MigrationTask, out io.Writer) { - if task == nil { - return - } - tbl := newTbl(out, "\nMigration Config:") - tbl.AddRow("source", fmt.Sprintf("%s:%s@%s/%s", - task.Spec.SourceEndpoint.UserName, - task.Spec.SourceEndpoint.Password, - task.Spec.SourceEndpoint.Address, - task.Spec.SourceEndpoint.DatabaseName, - )) - tbl.AddRow("sink", fmt.Sprintf("%s:%s@%s/%s", - task.Spec.SinkEndpoint.UserName, - task.Spec.SinkEndpoint.Password, - task.Spec.SinkEndpoint.Address, - task.Spec.SinkEndpoint.DatabaseName, - )) - tbl.AddRow("migration objects", task.Spec.MigrationObj.String(true)) - tbl.Print() -} - -func showTemplateSummary(template *v1alpha1.MigrationTemplate, out io.Writer) { - if template == nil { - return - } - title := fmt.Sprintf("\nTemplate: %s\t", template.Name) - tbl := newTbl(out, title, "DATABASE-MAPPING", "STATUS") - tbl.AddRow(template.Spec.Description, template.Status.Phase) - tbl.Print() -} - -func showInitialization(task *v1alpha1.MigrationTask, template *v1alpha1.MigrationTemplate, jobList *batchv1.JobList, out io.Writer) { - if len(jobList.Items) == 0 { - return - } - sort.SliceStable(jobList.Items, func(i, j int) bool { - jobName1 := jobList.Items[i].Name - jobName2 := jobList.Items[j].Name - order1, _ := strconv.ParseInt(string([]byte(jobName1)[strings.LastIndex(jobName1, "-")+1:]), 10, 8) - order2, _ := strconv.ParseInt(string([]byte(jobName2)[strings.LastIndex(jobName2, "-")+1:]), 10, 8) - return order1 < order2 - }) - cliStepOrder := BuildInitializationStepsOrder(task, template) - tbl := newTbl(out, "\nInitialization:", "STEP", "NAMESPACE", "STATUS", "CREATED_TIME", "START-TIME", "FINISHED-TIME") - if len(cliStepOrder) != len(jobList.Items) { - return - } - for i, job := range jobList.Items { - tbl.AddRow(cliStepOrder[i], job.Namespace, getJobStatus(job.Status.Conditions), util.TimeFormatWithDuration(&job.CreationTimestamp, time.Second), util.TimeFormatWithDuration(job.Status.StartTime, time.Second), util.TimeFormatWithDuration(job.Status.CompletionTime, time.Second)) - } - tbl.Print() -} - -func showCdc(statefulSets *appv1.StatefulSetList, pods *v1.PodList, out io.Writer) { - if len(pods.Items) == 0 || len(statefulSets.Items) == 0 { - return - } - tbl := newTbl(out, "\nCdc:", "NAMESPACE", "STATUS", "CREATED_TIME", "START-TIME") - for _, pod := range pods.Items { - if pod.Annotations[MigrationTaskStepAnnotation] != v1alpha1.StepCdc.String() { - continue - } - tbl.AddRow(pod.Namespace, getCdcStatus(&statefulSets.Items[0], &pod), util.TimeFormatWithDuration(&pod.CreationTimestamp, time.Second), util.TimeFormatWithDuration(pod.Status.StartTime, time.Second)) - } - tbl.Print() -} - -func showCdcMetrics(task *v1alpha1.MigrationTask, out io.Writer) { - if task.Status.Cdc.Metrics == nil || len(task.Status.Cdc.Metrics) == 0 { - return - } - arr := make([]string, 0) - for mKey := range task.Status.Cdc.Metrics { - arr = append(arr, mKey) - } - sort.Strings(arr) - tbl := newTbl(out, "\nCdc Metrics:") - for _, k := range arr { - tbl.AddRow(k, task.Status.Cdc.Metrics[k]) - } - tbl.Print() -} - -func getJobStatus(conditions []batchv1.JobCondition) string { - if len(conditions) == 0 { - return "-" - } else { - return string(conditions[len(conditions)-1].Type) - } -} - -func getCdcStatus(statefulSet *appv1.StatefulSet, cdcPod *v1.Pod) v1.PodPhase { - if cdcPod.Status.Phase == v1.PodRunning && - statefulSet.Status.Replicas > statefulSet.Status.AvailableReplicas { - if time.Now().Unix()-statefulSet.CreationTimestamp.Time.Unix() < 10*60 { - return v1.PodPending - } else { - return v1.PodFailed - } - } else { - return cdcPod.Status.Phase - } -} diff --git a/pkg/cmd/migration/describe_test.go b/pkg/cmd/migration/describe_test.go deleted file mode 100644 index a92299af7..000000000 --- a/pkg/cmd/migration/describe_test.go +++ /dev/null @@ -1,79 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - appv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/cli-runtime/pkg/genericiooptions" - cmdtesting "k8s.io/kubectl/pkg/cmd/testing" -) - -var _ = Describe("describe", func() { - - var ( - streams genericiooptions.IOStreams - tf *cmdtesting.TestFactory - ) - - It("command build", func() { - cmd := NewMigrationDescribeCmd(tf, streams) - Expect(cmd).ShouldNot(BeNil()) - }) - - It("func test", func() { - sts := appv1.StatefulSet{ - Status: appv1.StatefulSetStatus{ - Replicas: 1, - }, - } - pod := corev1.Pod{} - - sts.Status.AvailableReplicas = 0 - pod.Status.Phase = corev1.PodFailed - Expect(getCdcStatus(&sts, &pod)).Should(Equal(corev1.PodFailed)) - - sts.Status.AvailableReplicas = 1 - pod.Status.Phase = corev1.PodPending - Expect(getCdcStatus(&sts, &pod)).Should(Equal(corev1.PodPending)) - - sts.Status.AvailableReplicas = 1 - pod.Status.Phase = corev1.PodRunning - Expect(getCdcStatus(&sts, &pod)).Should(Equal(corev1.PodRunning)) - - sts.Status.AvailableReplicas = 0 - t1, _ := time.ParseDuration("-30m") - sts.CreationTimestamp = v1.NewTime(time.Now().Add(t1)) - pod.Status.Phase = corev1.PodRunning - Expect(getCdcStatus(&sts, &pod)).Should(Equal(corev1.PodFailed)) - - sts.Status.AvailableReplicas = 0 - sts.CreationTimestamp = v1.NewTime(time.Now()) - pod.Status.Phase = corev1.PodRunning - Expect(getCdcStatus(&sts, &pod)).Should(Equal(corev1.PodPending)) - }) - -}) diff --git a/pkg/cmd/migration/examples.go b/pkg/cmd/migration/examples.go deleted file mode 100644 index c7a6d1640..000000000 --- a/pkg/cmd/migration/examples.go +++ /dev/null @@ -1,107 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import "k8s.io/kubectl/pkg/util/templates" - -// Cli Migration Command Examples -var ( - CreateTemplate = templates.Examples(` - # Create a migration task to migrate the entire database under mysql: mydb1 and mytable1 under database: mydb2 to the target mysql - kbcli migration create mytask --template apecloud-mysql2mysql - --source user:123456@127.0.0.1:3306 - --sink user:123456@127.0.0.1:3305 - --migration-object '"mydb1","mydb2.mytable1"' - - # Create a migration task to migrate the schema: myschema under database: mydb1 under PostgreSQL to the target PostgreSQL - kbcli migration create mytask --template apecloud-pg2pg - --source user:123456@127.0.0.1:3306/mydb1 - --sink user:123456@127.0.0.1:3305/mydb1 - --migration-object '"myschema"' - - # Use prechecks, data initialization, CDC, but do not perform structure initialization - kbcli migration create mytask --template apecloud-pg2pg - --source user:123456@127.0.0.1:3306/mydb1 - --sink user:123456@127.0.0.1:3305/mydb1 - --migration-object '"myschema"' - --steps precheck=true,init-struct=false,init-data=true,cdc=true - - # Create a migration task with two tolerations - kbcli migration create mytask --template apecloud-pg2pg - --source user:123456@127.0.0.1:3306/mydb1 - --sink user:123456@127.0.0.1:3305/mydb1 - --migration-object '"myschema"' - --tolerations '"step=global,key=engineType,value=pg,operator=Equal,effect=NoSchedule","step=init-data,key=diskType,value=ssd,operator=Equal,effect=NoSchedule"' - - # Limit resource usage when performing data initialization - kbcli migration create mytask --template apecloud-pg2pg - --source user:123456@127.0.0.1:3306/mydb1 - --sink user:123456@127.0.0.1:3305/mydb1 - --migration-object '"myschema"' - --resources '"step=init-data,cpu=1000m,memory=1Gi"' - `) - DescribeExample = templates.Examples(` - # describe a specified migration task - kbcli migration describe mytask - `) - ListExample = templates.Examples(` - # list all migration tasks - kbcli migration list - - # list a single migration task with specified NAME - kbcli migration list mytask - - # list a single migration task in YAML output format - kbcli migration list mytask -o yaml - - # list a single migration task in JSON output format - kbcli migration list mytask -o json - - # list a single migration task in wide output format - kbcli migration list mytask -o wide - `) - TemplateExample = templates.Examples(` - # list all migration templates - kbcli migration templates - - # list a single migration template with specified NAME - kbcli migration templates mytemplate - - # list a single migration template in YAML output format - kbcli migration templates mytemplate -o yaml - - # list a single migration template in JSON output format - kbcli migration templates mytemplate -o json - - # list a single migration template in wide output format - kbcli migration templates mytemplate -o wide - `) - DeleteExample = templates.Examples(` - # terminate a migration task named mytask and delete resources in k8s without affecting source and target data in database - kbcli migration terminate mytask - `) - LogsExample = templates.Examples(` - # Logs when returning to the "init-struct" step from the migration task mytask - kbcli migration logs mytask --step init-struct - - # Logs only the most recent 20 lines when returning to the "cdc" step from the migration task mytask - kbcli migration logs mytask --step cdc --tail=20 - `) -) diff --git a/pkg/cmd/migration/list.go b/pkg/cmd/migration/list.go deleted file mode 100644 index b85fcb37f..000000000 --- a/pkg/cmd/migration/list.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - "github.com/spf13/cobra" - "k8s.io/cli-runtime/pkg/genericiooptions" - cmdutil "k8s.io/kubectl/pkg/cmd/util" - - "github.com/apecloud/kbcli/pkg/action" - "github.com/apecloud/kbcli/pkg/types" - "github.com/apecloud/kbcli/pkg/util" -) - -func NewMigrationListCmd(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command { - o := action.NewListOptions(f, streams, types.MigrationTaskGVR()) - cmd := &cobra.Command{ - Use: "list [NAME]", - Short: "List migration tasks.", - Example: ListExample, - Aliases: []string{"ls"}, - ValidArgsFunction: util.ResourceNameCompletionFunc(f, o.GVR), - Run: func(cmd *cobra.Command, args []string) { - _, validErr := IsMigrationCrdValidWithFactory(o.Factory) - PrintCrdInvalidError(validErr) - o.Names = args - _, err := o.Run() - util.CheckErr(err) - }, - } - o.AddFlags(cmd) - return cmd -} diff --git a/pkg/cmd/migration/list_test.go b/pkg/cmd/migration/list_test.go deleted file mode 100644 index 343251213..000000000 --- a/pkg/cmd/migration/list_test.go +++ /dev/null @@ -1,42 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "k8s.io/cli-runtime/pkg/genericiooptions" - cmdtesting "k8s.io/kubectl/pkg/cmd/testing" -) - -var _ = Describe("list", func() { - - var ( - streams genericiooptions.IOStreams - tf *cmdtesting.TestFactory - ) - - It("command build", func() { - cmd := NewMigrationListCmd(tf, streams) - Expect(cmd).ShouldNot(BeNil()) - }) - -}) diff --git a/pkg/cmd/migration/logs.go b/pkg/cmd/migration/logs.go deleted file mode 100644 index 143f37aa8..000000000 --- a/pkg/cmd/migration/logs.go +++ /dev/null @@ -1,228 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - "context" - "fmt" - "strconv" - "strings" - "time" - - "github.com/spf13/cobra" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/cli-runtime/pkg/genericiooptions" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" - cmdlogs "k8s.io/kubectl/pkg/cmd/logs" - cmdutil "k8s.io/kubectl/pkg/cmd/util" - "k8s.io/kubectl/pkg/polymorphichelpers" - - "github.com/apecloud/kbcli/pkg/action" - "github.com/apecloud/kbcli/pkg/types" - migrationv1 "github.com/apecloud/kbcli/pkg/types/migrationapi" - "github.com/apecloud/kbcli/pkg/util" -) - -type LogsOptions struct { - taskName string - step string - Client *kubernetes.Clientset - Dynamic dynamic.Interface - *action.ExecOptions - logOptions cmdlogs.LogsOptions -} - -func NewMigrationLogsCmd(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command { - l := &LogsOptions{ - ExecOptions: action.NewExecOptions(f, streams), - logOptions: cmdlogs.LogsOptions{ - Tail: -1, - IOStreams: streams, - }, - } - cmd := &cobra.Command{ - Use: "logs NAME", - Short: "Access migration task log file.", - Example: LogsExample, - ValidArgsFunction: util.ResourceNameCompletionFunc(f, types.MigrationTaskGVR()), - Run: func(cmd *cobra.Command, args []string) { - util.CheckErr(l.ExecOptions.Complete()) - util.CheckErr(l.complete(f, cmd, args)) - util.CheckErr(l.validate()) - util.CheckErr(l.runLogs()) - }, - } - l.addFlags(cmd) - return cmd -} - -func (o *LogsOptions) addFlags(cmd *cobra.Command) { - cmd.Flags().StringVar(&o.step, "step", "", "Specify the step. Allow values: precheck,init-struct,init-data,cdc") - - o.logOptions.AddFlags(cmd) -} - -// complete customs complete function for logs -func (o *LogsOptions) complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error { - if len(args) == 0 { - return fmt.Errorf("migration task name should be specified") - } - if len(args) > 0 { - o.taskName = args[0] - } - if o.step == "" { - return fmt.Errorf("migration task step should be specified") - } - var err error - o.logOptions.Namespace, _, err = f.ToRawKubeConfigLoader().Namespace() - if err != nil { - return err - } - - o.Dynamic, err = f.DynamicClient() - if err != nil { - return err - } - - o.Client, err = f.KubernetesClientSet() - if err != nil { - return err - } - - if _, err = IsMigrationCrdValidWithDynamic(&o.Dynamic); err != nil { - PrintCrdInvalidError(err) - } - - taskObj, err := o.getMigrationObjects(o.taskName) - if err != nil { - return fmt.Errorf("failed to find the migrationtask") - } - pod := o.getPodByStep(taskObj, strings.TrimSpace(o.step)) - if pod == nil { - return fmt.Errorf("migrationtask[%s] step[%s] 's pod not found", taskObj.Task.Name, o.step) - } - o.logOptions.RESTClientGetter = f - o.logOptions.LogsForObject = polymorphichelpers.LogsForObjectFn - o.logOptions.Object = pod - o.logOptions.Options, _ = o.logOptions.ToLogOptions() - o.Pod = pod - - return nil -} - -func (o *LogsOptions) validate() error { - if len(o.taskName) == 0 { - return fmt.Errorf("migration task name must be specified") - } - - if o.logOptions.LimitBytes < 0 { - return fmt.Errorf("--limit-bytes must be greater than 0") - } - if o.logOptions.Tail < -1 { - return fmt.Errorf("--tail must be greater than or equal to -1") - } - if len(o.logOptions.SinceTime) > 0 && o.logOptions.SinceSeconds != 0 { - return fmt.Errorf("at most one of `sinceTime` or `sinceSeconds` may be specified") - } - logsOptions, ok := o.logOptions.Options.(*corev1.PodLogOptions) - if !ok { - return fmt.Errorf("unexpected logs options object") - } - if logsOptions.SinceSeconds != nil && *logsOptions.SinceSeconds < int64(0) { - return fmt.Errorf("--since must be greater than 0") - } - if logsOptions.TailLines != nil && *logsOptions.TailLines < -1 { - return fmt.Errorf("--tail must be greater than or equal to -1") - } - return nil -} - -func (o *LogsOptions) getMigrationObjects(taskName string) (*migrationv1.MigrationObjects, error) { - obj := &migrationv1.MigrationObjects{ - Task: &migrationv1.MigrationTask{}, - Template: &migrationv1.MigrationTemplate{}, - } - var err error - taskGvr := types.MigrationTaskGVR() - if err = APIResource(&o.Dynamic, &taskGvr, taskName, o.logOptions.Namespace, obj.Task); err != nil { - return nil, err - } - templateGvr := types.MigrationTemplateGVR() - if err = APIResource(&o.Dynamic, &templateGvr, obj.Task.Spec.Template, "", obj.Template); err != nil { - return nil, err - } - listOpts := func() metav1.ListOptions { - return metav1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", MigrationTaskLabel, taskName), - } - } - if obj.Pods, err = o.Client.CoreV1().Pods(o.logOptions.Namespace).List(context.Background(), listOpts()); err != nil { - return nil, err - } - return obj, nil -} - -func (o *LogsOptions) runLogs() error { - requests, err := o.logOptions.LogsForObject(o.logOptions.RESTClientGetter, o.logOptions.Object, o.logOptions.Options, 60*time.Second, false) - if err != nil { - return err - } - for _, request := range requests { - if err := cmdlogs.DefaultConsumeRequest(request, o.Out); err != nil { - if !o.logOptions.IgnoreLogErrors { - return err - } - fmt.Fprintf(o.Out, "error: %v\n", err) - } - } - return nil -} - -func (o *LogsOptions) getPodByStep(taskObj *migrationv1.MigrationObjects, step string) *corev1.Pod { - if taskObj == nil || len(taskObj.Pods.Items) == 0 { - return nil - } - switch step { - case migrationv1.CliStepCdc.String(): - for _, pod := range taskObj.Pods.Items { - if pod.Annotations[MigrationTaskStepAnnotation] == migrationv1.StepCdc.String() { - return &pod - } - } - case migrationv1.CliStepPreCheck.String(), migrationv1.CliStepInitStruct.String(), migrationv1.CliStepInitData.String(): - stepArr := BuildInitializationStepsOrder(taskObj.Task, taskObj.Template) - orderNo := "-1" - for index, stepByTemplate := range stepArr { - if step == stepByTemplate { - orderNo = strconv.Itoa(index) - break - } - } - for _, pod := range taskObj.Pods.Items { - if pod.Annotations[SerialJobOrderAnnotation] != "" && - pod.Annotations[SerialJobOrderAnnotation] == orderNo { - return &pod - } - } - } - return nil -} diff --git a/pkg/cmd/migration/logs_test.go b/pkg/cmd/migration/logs_test.go deleted file mode 100644 index a0c2502fa..000000000 --- a/pkg/cmd/migration/logs_test.go +++ /dev/null @@ -1,42 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "k8s.io/cli-runtime/pkg/genericiooptions" - cmdtesting "k8s.io/kubectl/pkg/cmd/testing" -) - -var _ = Describe("logs", func() { - - var ( - streams genericiooptions.IOStreams - tf *cmdtesting.TestFactory - ) - - It("command build", func() { - cmd := NewMigrationLogsCmd(tf, streams) - Expect(cmd).ShouldNot(BeNil()) - }) - -}) diff --git a/pkg/cmd/migration/suite_test.go b/pkg/cmd/migration/suite_test.go deleted file mode 100644 index b63674016..000000000 --- a/pkg/cmd/migration/suite_test.go +++ /dev/null @@ -1,32 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration_test - -import ( - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -func TestMigration(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Migration Suite") -} diff --git a/pkg/cmd/migration/templates.go b/pkg/cmd/migration/templates.go deleted file mode 100644 index 7b41a0762..000000000 --- a/pkg/cmd/migration/templates.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - "github.com/spf13/cobra" - "k8s.io/cli-runtime/pkg/genericiooptions" - cmdutil "k8s.io/kubectl/pkg/cmd/util" - - "github.com/apecloud/kbcli/pkg/action" - "github.com/apecloud/kbcli/pkg/types" - "github.com/apecloud/kbcli/pkg/util" -) - -func NewMigrationTemplatesCmd(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command { - o := action.NewListOptions(f, streams, types.MigrationTemplateGVR()) - cmd := &cobra.Command{ - Use: "templates [NAME]", - Short: "List migration templates.", - Example: TemplateExample, - Aliases: []string{"tp", "template"}, - ValidArgsFunction: util.ResourceNameCompletionFunc(f, o.GVR), - Run: func(cmd *cobra.Command, args []string) { - _, validErr := IsMigrationCrdValidWithFactory(o.Factory) - PrintCrdInvalidError(validErr) - o.Names = args - _, err := o.Run() - util.CheckErr(err) - }, - } - o.AddFlags(cmd) - return cmd -} diff --git a/pkg/cmd/migration/templates_test.go b/pkg/cmd/migration/templates_test.go deleted file mode 100644 index b4e215696..000000000 --- a/pkg/cmd/migration/templates_test.go +++ /dev/null @@ -1,42 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "k8s.io/cli-runtime/pkg/genericiooptions" - cmdtesting "k8s.io/kubectl/pkg/cmd/testing" -) - -var _ = Describe("templates", func() { - - var ( - streams genericiooptions.IOStreams - tf *cmdtesting.TestFactory - ) - - It("command build", func() { - cmd := NewMigrationTemplatesCmd(tf, streams) - Expect(cmd).ShouldNot(BeNil()) - }) - -}) diff --git a/pkg/cmd/migration/terminate.go b/pkg/cmd/migration/terminate.go deleted file mode 100644 index 77a5bf650..000000000 --- a/pkg/cmd/migration/terminate.go +++ /dev/null @@ -1,57 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - "fmt" - - "github.com/spf13/cobra" - "k8s.io/cli-runtime/pkg/genericiooptions" - cmdutil "k8s.io/kubectl/pkg/cmd/util" - - "github.com/apecloud/kbcli/pkg/action" - "github.com/apecloud/kbcli/pkg/types" - "github.com/apecloud/kbcli/pkg/util" -) - -func NewMigrationTerminateCmd(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command { - o := action.NewDeleteOptions(f, streams, types.MigrationTaskGVR()) - cmd := &cobra.Command{ - Use: "terminate NAME", - Short: "Delete migration task.", - Example: DeleteExample, - ValidArgsFunction: util.ResourceNameCompletionFunc(f, types.MigrationTaskGVR()), - Run: func(cmd *cobra.Command, args []string) { - _, validErr := IsMigrationCrdValidWithFactory(o.Factory) - PrintCrdInvalidError(validErr) - util.CheckErr(deleteMigrationTask(o, args)) - }, - } - o.AddFlags(cmd) - return cmd -} - -func deleteMigrationTask(o *action.DeleteOptions, args []string) error { - if len(args) == 0 { - return fmt.Errorf("missing migration task name") - } - o.Names = args - return o.Run() -} diff --git a/pkg/cmd/migration/terminate_test.go b/pkg/cmd/migration/terminate_test.go deleted file mode 100644 index 125ff5983..000000000 --- a/pkg/cmd/migration/terminate_test.go +++ /dev/null @@ -1,42 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package migration - -import ( - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "k8s.io/cli-runtime/pkg/genericiooptions" - cmdtesting "k8s.io/kubectl/pkg/cmd/testing" -) - -var _ = Describe("terminate", func() { - - var ( - streams genericiooptions.IOStreams - tf *cmdtesting.TestFactory - ) - - It("command build", func() { - cmd := NewMigrationTerminateCmd(tf, streams) - Expect(cmd).ShouldNot(BeNil()) - }) - -}) diff --git a/pkg/types/migrationapi/migration_object_express.go b/pkg/types/migrationapi/migration_object_express.go deleted file mode 100644 index 2498cc755..000000000 --- a/pkg/types/migrationapi/migration_object_express.go +++ /dev/null @@ -1,96 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package v1alpha1 - -import ( - "fmt" - "strings" -) - -type MigrationObjectExpress struct { - WhiteList []DBObjectExpress `json:"whiteList"` - // +optional - BlackList []DBObjectExpress `json:"blackList"` -} - -func (m *MigrationObjectExpress) String(isWhite bool) string { - expressArr := m.WhiteList - if !isWhite { - expressArr = m.BlackList - } - stringArr := make([]string, 0) - for _, db := range expressArr { - stringArr = append(stringArr, db.String()...) - } - return strings.Join(stringArr, ",") -} - -type DBObjectExpress struct { - SchemaName string `json:"schemaName"` - // +optional - SchemaMappingName string `json:"schemaMappingName"` - // +optional - IsAll bool `json:"isAll"` - // +optional - TableList []TableObjectExpress `json:"tableList"` - DxlOpConfig `json:""` -} - -func (d *DBObjectExpress) String() []string { - stringArr := make([]string, 0) - if d.IsAll { - stringArr = append(stringArr, d.SchemaName) - } else { - for _, tb := range d.TableList { - stringArr = append(stringArr, fmt.Sprintf("%s.%s", d.SchemaName, tb.TableName)) - } - } - return stringArr -} - -type TableObjectExpress struct { - TableName string `json:"tableName"` - // +optional - TableMappingName string `json:"tableMappingName"` - // +optional - IsAll bool `json:"isAll"` - // +optional - FieldList []FieldObjectExpress `json:"fieldList"` - DxlOpConfig `json:""` -} - -type FieldObjectExpress struct { - FieldName string `json:"fieldName"` - // +optional - FieldMappingName string `json:"fieldMappingName"` -} - -type DxlOpConfig struct { - // +optional - DmlOp []DMLOpEnum `json:"dmlOp"` - // +optional - DdlOp []DDLOpEnum `json:"ddlOp"` - // +optional - DclOp []DCLOpEnum `json:"dclOp"` -} - -func (op *DxlOpConfig) IsEmpty() bool { - return len(op.DmlOp) == 0 -} diff --git a/pkg/types/migrationapi/migrationtask_types.go b/pkg/types/migrationapi/migrationtask_types.go deleted file mode 100644 index eeb96babd..000000000 --- a/pkg/types/migrationapi/migrationtask_types.go +++ /dev/null @@ -1,156 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package v1alpha1 - -import ( - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -// MigrationTaskSpec defines the desired state of MigrationTask -type MigrationTaskSpec struct { - TaskType TaskTypeEnum `json:"taskType,omitempty"` - Template string `json:"template"` - SourceEndpoint Endpoint `json:"sourceEndpoint,omitempty"` - SinkEndpoint Endpoint `json:"sinkEndpoint,omitempty"` - // +optional - Cdc CdcConfig `json:"cdc,omitempty"` - // +optional - Initialization InitializationConfig `json:"initialization,omitempty"` - MigrationObj MigrationObjectExpress `json:"migrationObj,omitempty"` - // +optional - IsForceDelete bool `json:"isForceDelete,omitempty"` - // +optional - GlobalTolerations []v1.Toleration `json:"globalTolerations,omitempty"` - // +optional - GlobalResources v1.ResourceRequirements `json:"globalResources,omitempty"` -} - -type Endpoint struct { - // +optional - EndpointType EndpointTypeEnum `json:"endpointType,omitempty"` - Address string `json:"address"` - // +optional - DatabaseName string `json:"databaseName,omitempty"` - // +optional - UserName string `json:"userName"` - // +optional - Password string `json:"password"` - // +optional - Secret UserPswSecret `json:"secret"` -} - -type UserPswSecret struct { - Name string `json:"name"` - // +optional - Namespace string `json:"namespace,omitempty"` - // +optional - UserKeyword string `json:"userKeyword,omitempty"` - // +optional - PasswordKeyword string `json:"passwordKeyword,omitempty"` -} - -type CdcConfig struct { - // +optional - Config BaseConfig `json:"config"` -} - -type InitializationConfig struct { - // +optional - Steps []StepEnum `json:"steps,omitempty"` - // +optional - Config map[StepEnum]BaseConfig `json:"config,omitempty"` -} - -type BaseConfig struct { - // +optional - Resource v1.ResourceRequirements `json:"resource,omitempty"` - // +optional - Tolerations []v1.Toleration `json:"tolerations,omitempty"` - // +optional - // +kubebuilder:pruning:PreserveUnknownFields - // +kubebuilder:validation:Schemaless - Param IntOrStringMap `json:"param"` - // +optional - PersistentVolumeClaimName string `json:"persistentVolumeClaimName"` - // +optional - Metrics Metrics `json:"metrics,omitempty"` -} - -// MigrationTaskStatus defines the observed state of MigrationTask -type MigrationTaskStatus struct { - // +optional - TaskStatus TaskStatus `json:"taskStatus"` - // +optional - StartTime *metav1.Time `json:"startTime"` - // +optional - FinishTime *metav1.Time `json:"finishTime"` - // +optional - Cdc RunTimeStatus `json:"cdc"` - // +optional - Initialization RunTimeStatus `json:"initialization"` -} - -type RunTimeStatus struct { - // +optional - StartTime *metav1.Time `json:"startTime"` - // +optional - FinishTime *metav1.Time `json:"finishTime"` - // +optional - // +kubebuilder:pruning:PreserveUnknownFields - // +kubebuilder:validation:Schemaless - RunTimeParam IntOrStringMap `json:"runTimeParam,omitempty"` - // +optional - // +kubebuilder:pruning:PreserveUnknownFields - // +kubebuilder:validation:Schemaless - Metrics IntOrStringMap `json:"metrics,omitempty"` - // +optional - FailedReason string `json:"failedReason,omitempty"` -} - -// +kubebuilder:object:root=true -// +kubebuilder:subresource:status -// +kubebuilder:resource:categories={dtplatform},scope=Cluster,shortName=mt -// +kubebuilder:printcolumn:name="TEMPLATE",type="string",JSONPath=".spec.template",description="spec migration template" -// +kubebuilder:printcolumn:name="STATUS",type="string",JSONPath=".status.taskStatus",description="status taskStatus" -// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp" - -// MigrationTask is the Schema for the migrationTasks API -type MigrationTask struct { - metav1.TypeMeta `json:",inline"` - metav1.ObjectMeta `json:"metadata,omitempty"` - - Spec MigrationTaskSpec `json:"spec,omitempty"` - Status MigrationTaskStatus `json:"status,omitempty"` -} - -// +kubebuilder:object:root=true - -// MigrationTaskList contains a list of MigrationTask -type MigrationTaskList struct { - metav1.TypeMeta `json:",inline"` - metav1.ListMeta `json:"metadata,omitempty"` - Items []MigrationTask `json:"items"` -} - -type Metrics struct { - IsDisable bool `json:"isDisable,omitempty"` - PeriodSeconds int32 `json:"periodSeconds,omitempty"` -} diff --git a/pkg/types/migrationapi/migrationtemplate_types.go b/pkg/types/migrationapi/migrationtemplate_types.go deleted file mode 100644 index d8567d491..000000000 --- a/pkg/types/migrationapi/migrationtemplate_types.go +++ /dev/null @@ -1,106 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package v1alpha1 - -import ( - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -// MigrationTemplateSpec defines the desired state of MigrationTemplate -type MigrationTemplateSpec struct { - TaskType []TaskTypeEnum `json:"taskType,omitempty"` - Source DBTypeSupport `json:"source"` - Sink DBTypeSupport `json:"target"` - Initialization InitializationModel `json:"initialization,omitempty"` - Cdc CdcModel `json:"cdc,omitempty"` - // +optional - Description string `json:"description,omitempty"` - // +optional - Decorator string `json:"decorator,omitempty"` -} - -type DBTypeSupport struct { - DBType DBTypeEnum `json:"dbType"` - DBVersion string `json:"dbVersion"` -} - -type InitializationModel struct { - // +optional - IsPositionPreparation bool `json:"isPositionPreparation,omitempty"` - Steps []StepModel `json:"steps,omitempty"` -} - -type StepModel struct { - Step StepEnum `json:"step"` - Container BasicContainerTemplate `json:"container"` - // +optional - // +kubebuilder:pruning:PreserveUnknownFields - // +kubebuilder:validation:Schemaless - Param IntOrStringMap `json:"param"` -} - -type CdcModel struct { - Container BasicContainerTemplate `json:"container"` - // +optional - Replicas *int32 `json:"replicas,omitempty"` - // +optional - // +kubebuilder:pruning:PreserveUnknownFields - // +kubebuilder:validation:Schemaless - Param IntOrStringMap `json:"param"` -} - -type BasicContainerTemplate struct { - Image string `json:"image"` - // +optional - Command []string `json:"command,omitempty"` - // +optional - Env []v1.EnvVar `json:"env,omitempty"` -} - -// MigrationTemplateStatus defines the observed state of MigrationTemplate -type MigrationTemplateStatus struct { - Phase Phase `json:"phase,omitempty"` -} - -// +kubebuilder:object:root=true -// +kubebuilder:subresource:status -// +kubebuilder:resource:categories={dtplatform},scope=Cluster,shortName=mtp -// +kubebuilder:printcolumn:name="DATABASE-MAPPING",type="string",JSONPath=".spec.description",description="the database mapping that supported" -// +kubebuilder:printcolumn:name="STATUS",type="string",JSONPath=".status.phase",description="the template status" -// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp" - -// MigrationTemplate is the Schema for the migrationtemplates API -type MigrationTemplate struct { - metav1.TypeMeta `json:",inline"` - metav1.ObjectMeta `json:"metadata,omitempty"` - - Spec MigrationTemplateSpec `json:"spec,omitempty"` - Status MigrationTemplateStatus `json:"status,omitempty"` -} - -// +kubebuilder:object:root=true - -// MigrationTemplateList contains a list of MigrationTemplate -type MigrationTemplateList struct { - metav1.TypeMeta `json:",inline"` - metav1.ListMeta `json:"metadata,omitempty"` - Items []MigrationTemplate `json:"items"` -} diff --git a/pkg/types/migrationapi/type.go b/pkg/types/migrationapi/type.go deleted file mode 100644 index 32f8cca94..000000000 --- a/pkg/types/migrationapi/type.go +++ /dev/null @@ -1,213 +0,0 @@ -/* -Copyright (C) 2022-2024 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package v1alpha1 - -import ( - "strings" - - appv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" -) - -// DBTypeEnum defines the MigrationTemplate CR .spec.Source.DbType or .spec.Sink.DbType -// +enum -// +kubebuilder:validation:Enum={MySQL, PostgreSQL} -type DBTypeEnum string - -const ( - MigrationDBTypeMySQL DBTypeEnum = "MySQL" // default value - MigrationDBTypePostgreSQL DBTypeEnum = "PostgreSQL" -) - -func (d DBTypeEnum) String() string { - return string(d) -} - -// TaskTypeEnum defines the MigrationTask CR .spec.taskType -// +enum -// +kubebuilder:validation:Enum={initialization,cdc,initialization-and-cdc,initialization-and-twoway-cdc} -type TaskTypeEnum string - -const ( - Initialization TaskTypeEnum = "initialization" - CDC TaskTypeEnum = "cdc" - InitializationAndCdc TaskTypeEnum = "initialization-and-cdc" // default value -) - -// EndpointTypeEnum defines the MigrationTask CR .spec.source.endpointType and .spec.sink.endpointType -// +enum -// +kubebuilder:validation:Enum={address} -type EndpointTypeEnum string - -const ( - AddressDirectConnect EndpointTypeEnum = "address" // default value -) - -// non-use yet - -type ConflictPolicyEnum string - -const ( - Ignore ConflictPolicyEnum = "ignore" // default in FullLoad - Override ConflictPolicyEnum = "override" // default in CDC -) - -// DMLOpEnum defines the MigrationTask CR .spec.migrationObj -// +enum -// +kubebuilder:validation:Enum={all,none,insert,update,delete} -type DMLOpEnum string - -const ( - AllDML DMLOpEnum = "all" - NoneDML DMLOpEnum = "none" - Insert DMLOpEnum = "insert" - Update DMLOpEnum = "update" - Delete DMLOpEnum = "delete" -) - -// DDLOpEnum defines the MigrationTask CR .spec.migrationObj -// +enum -// +kubebuilder:validation:Enum={all,none} -type DDLOpEnum string - -const ( - AllDDL DDLOpEnum = "all" - NoneDDL DDLOpEnum = "none" -) - -// DCLOpEnum defines the MigrationTask CR .spec.migrationObj -// +enum -// +kubebuilder:validation:Enum={all,none} -type DCLOpEnum string - -const ( - AllDCL DDLOpEnum = "all" - NoneDCL DDLOpEnum = "none" -) - -// TaskStatus defines the MigrationTask CR .status.taskStatus -// +enum -// +kubebuilder:validation:Enum={Prepare,InitPrepared,Init,InitFinished,Running,Cached,Pause,Done} -type TaskStatus string - -const ( - PrepareStatus TaskStatus = "Prepare" - InitPrepared TaskStatus = "InitPrepared" - InitStatus TaskStatus = "Init" - InitFinished TaskStatus = "InitFinished" - RunningStatus TaskStatus = "Running" - CachedStatus TaskStatus = "Cached" - PauseStatus TaskStatus = "Pause" - DoneStatus TaskStatus = "Done" -) - -// StepEnum defines the MigrationTask CR .spec.steps -// +enum -// +kubebuilder:validation:Enum={preCheck,initStruct,initData,initStructLater} -type StepEnum string - -const ( - StepPreCheck StepEnum = "preCheck" - StepStructPreFullLoad StepEnum = "initStruct" - StepFullLoad StepEnum = "initData" - StepStructAfterFullLoad StepEnum = "initStructLater" - StepInitialization StepEnum = "initialization" - StepPreDelete StepEnum = "preDelete" - StepCdc StepEnum = "cdc" -) - -func (s StepEnum) String() string { - return string(s) -} - -func (s StepEnum) LowerCaseString() string { - return strings.ToLower(s.String()) -} - -func (s StepEnum) CliString() string { - switch s { - case StepPreCheck: - return CliStepPreCheck.String() - case StepStructPreFullLoad: - return CliStepInitStruct.String() - case StepFullLoad: - return CliStepInitData.String() - case StepCdc: - return CliStepCdc.String() - default: - return "unknown" - } -} - -type CliStepEnum string - -const ( - CliStepGlobal CliStepEnum = "global" - CliStepPreCheck CliStepEnum = "precheck" - CliStepInitStruct CliStepEnum = "init-struct" - CliStepInitData CliStepEnum = "init-data" - CliStepCdc CliStepEnum = "cdc" -) - -func (s CliStepEnum) String() string { - return string(s) -} - -// Phase defines the MigrationTemplate CR .status.phase -// +enum -// +kubebuilder:validation:Enum={Available,Unavailable} -type Phase string - -const ( - AvailablePhase Phase = "Available" - UnavailablePhase Phase = "Unavailable" -) - -type MigrationObjects struct { - Task *MigrationTask - Template *MigrationTemplate - - Jobs *batchv1.JobList - Pods *v1.PodList - StatefulSets *appv1.StatefulSetList -} - -// +k8s:deepcopy-gen=false - -type IntOrStringMap map[string]interface{} - -func (in *IntOrStringMap) DeepCopyInto(out *IntOrStringMap) { - if in == nil { - *out = nil - } else { - *out = runtime.DeepCopyJSON(*in) - } -} - -func (in *IntOrStringMap) DeepCopy() *IntOrStringMap { - if in == nil { - return nil - } - out := new(IntOrStringMap) - in.DeepCopyInto(out) - return out -} diff --git a/pkg/types/types.go b/pkg/types/types.go index a9c1452e8..02ed5c6d5 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -190,14 +190,6 @@ const ( ResourceStorageProviders = "storageproviders" ) -// Migration API group -const ( - MigrationAPIGroup = "datamigration.apecloud.io" - MigrationAPIVersion = "v1alpha1" - ResourceMigrationTasks = "migrationtasks" - ResourceMigrationTemplates = "migrationtemplates" -) - // Crd Api group const ( CustomResourceDefinitionAPIGroup = "apiextensions.k8s.io" @@ -468,22 +460,6 @@ func ServiceAccountGVR() schema.GroupVersionResource { return schema.GroupVersionResource{Group: corev1.GroupName, Version: K8sCoreAPIVersion, Resource: ServiceAccounts} } -func MigrationTaskGVR() schema.GroupVersionResource { - return schema.GroupVersionResource{ - Group: MigrationAPIGroup, - Version: MigrationAPIVersion, - Resource: ResourceMigrationTasks, - } -} - -func MigrationTemplateGVR() schema.GroupVersionResource { - return schema.GroupVersionResource{ - Group: MigrationAPIGroup, - Version: MigrationAPIVersion, - Resource: ResourceMigrationTemplates, - } -} - func CustomResourceDefinitionGVR() schema.GroupVersionResource { return schema.GroupVersionResource{ Group: CustomResourceDefinitionAPIGroup,