diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index db092280c4..ace2ddf4dd 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -7,6 +7,7 @@ on: pull_request: branches: - main + types: [review_requested, ready_for_review, synchronize] concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index 475f939cb4..76da3ea856 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -4,6 +4,7 @@ on: pull_request: branches: - main + types: [review_requested, ready_for_review, synchronize] concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} diff --git a/apis/core/v1alpha1/common_types.go b/apis/core/v1alpha1/common_types.go index 7a2935fd69..f4d130f0ed 100644 --- a/apis/core/v1alpha1/common_types.go +++ b/apis/core/v1alpha1/common_types.go @@ -149,6 +149,9 @@ type Volume struct { Storage resource.Quantity `json:"storage"` // StorageClassName means the storage class the volume used. + // You can modify volumes' attributes by changing the StorageClass + // when VolumeAttributesClass is not available. + // Note that only newly created PV will use the new StorageClass. StorageClassName *string `json:"storageClassName,omitempty"` } diff --git a/apis/core/v1alpha1/doc.go b/apis/core/v1alpha1/doc.go index a70f967df0..198304970a 100644 --- a/apis/core/v1alpha1/doc.go +++ b/apis/core/v1alpha1/doc.go @@ -6,6 +6,7 @@ // // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=core,resources=persistentvolumes,verbs=get;list;watch // +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete @@ -14,6 +15,8 @@ // // +kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;patch;delete // +// +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch +// // +kubebuilder:rbac:groups=core.pingcap.com,resources=clusters,verbs=get;list;watch;update // +kubebuilder:rbac:groups=core.pingcap.com,resources=clusters/status,verbs=get;update;patch // diff --git a/cmd/operator/main.go b/cmd/operator/main.go index eb571cf625..877753b875 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap/zapcore" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -129,6 +130,12 @@ func main() { Label: labels.Everything(), }, &appsv1.ControllerRevision{}: {}, + &corev1.PersistentVolume{}: { + Label: labels.Everything(), + }, + &storagev1.StorageClass{}: { + Label: labels.Everything(), + }, }, DefaultLabelSelector: labels.SelectorFromSet(labels.Set{ v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, diff --git a/go.mod b/go.mod index 45b2f79216..f9033ccb15 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,10 @@ toolchain go1.22.3 require ( github.com/Masterminds/semver/v3 v3.3.0 + github.com/aws/aws-sdk-go-v2 v1.30.5 + github.com/aws/aws-sdk-go-v2/config v1.27.35 + github.com/aws/aws-sdk-go-v2/service/ec2 v1.177.3 + github.com/aws/smithy-go v1.20.4 github.com/docker/go-units v0.5.0 github.com/evanphx/json-patch v4.12.0+incompatible github.com/go-logr/logr v1.4.2 @@ -28,6 +32,7 @@ require ( k8s.io/cli-runtime v0.31.0 k8s.io/client-go v0.31.0 k8s.io/code-generator v0.31.0 + k8s.io/klog/v2 v2.130.1 k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 k8s.io/kubectl v0.31.0 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 @@ -43,6 +48,16 @@ require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/MakeNowJust/heredoc v1.0.0 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.33 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.22.8 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.8 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.30.8 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -125,7 +140,6 @@ require ( k8s.io/apiextensions-apiserver v0.31.0 // indirect k8s.io/component-base v0.31.0 // indirect k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70 // indirect - k8s.io/klog/v2 v2.130.1 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/kustomize/api v0.17.2 // indirect sigs.k8s.io/kustomize/kyaml v0.17.1 // indirect diff --git a/go.sum b/go.sum index 96f86deb6e..f19fc49644 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,34 @@ github.com/Masterminds/semver/v3 v3.3.0 h1:B8LGeaivUe71a5qox1ICM/JLl0NqZSW5CHyL+ github.com/Masterminds/semver/v3 v3.3.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= +github.com/aws/aws-sdk-go-v2 v1.30.5 h1:mWSRTwQAb0aLE17dSzztCVJWI9+cRMgqebndjwDyK0g= +github.com/aws/aws-sdk-go-v2 v1.30.5/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2/config v1.27.35 h1:jeFgiWYNV0vrgdZqB4kZBjYNdy0IKkwrAjr2fwpHIig= +github.com/aws/aws-sdk-go-v2/config v1.27.35/go.mod h1:qnpEvTq8ZfjrCqmJGRfWZuF+lGZ/vG8LK2K0L/TY1gQ= +github.com/aws/aws-sdk-go-v2/credentials v1.17.33 h1:lBHAQQznENv0gLHAZ73ONiTSkCtr8q3pSqWrpbBBZz0= +github.com/aws/aws-sdk-go-v2/credentials v1.17.33/go.mod h1:MBuqCUOT3ChfLuxNDGyra67eskx7ge9e3YKYBce7wpI= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 h1:pfQ2sqNpMVK6xz2RbqLEL0GH87JOwSxPV2rzm8Zsb74= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13/go.mod h1:NG7RXPUlqfsCLLFfi0+IpKN4sCB9D9fw/qTaSB+xRoU= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 h1:pI7Bzt0BJtYA0N/JEC6B8fJ4RBrEMi1LBrkMdFYNSnQ= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17/go.mod h1:Dh5zzJYMtxfIjYW+/evjQ8uj2OyR/ve2KROHGHlSFqE= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 h1:Mqr/V5gvrhA2gvgnF42Zh5iMiQNcOYthFYwCyrnuWlc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17/go.mod h1:aLJpZlCmjE+V+KtN1q1uyZkfnUWpQGpbsn89XPKyzfU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/service/ec2 v1.177.3 h1:dqdCh1M8h+j8OGNUpxTs7eBPFr6lOdLpdlE6IPLLSq4= +github.com/aws/aws-sdk-go-v2/service/ec2 v1.177.3/go.mod h1:TFSALWR7Xs7+KyMM87ZAYxncKFBvzEt2rpK/BJCH2ps= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 h1:rfprUlsdzgl7ZL2KlXiUAoJnI/VxfHCvDFr2QDFj6u4= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19/go.mod h1:SCWkEdRq8/7EK60NcvvQ6NXKuTcchAD4ROAsC37VEZE= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.8 h1:JRwuL+S1Qe1owZQoxblV7ORgRf2o0SrtzDVIbaVCdQ0= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.8/go.mod h1:eEygMHnTKH/3kNp9Jr1n3PdejuSNcgwLe1dWgQtO0VQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.8 h1:+HpGETD9463PFSj7lX5+eq7aLDs85QUIA+NBkeAsscA= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.8/go.mod h1:bCbAxKDqNvkHxRaIMnyVPXPo+OaPRwvmgzMxbz1VKSA= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.8 h1:bAi+4p5EKnni+jrfcAhb7iHFQ24bthOAV9t0taf3DCE= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.8/go.mod h1:NXi1dIAGteSaRLqYgarlhP/Ij0cFT+qmCwiJqWh/U5o= +github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= +github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= diff --git a/manifests/crd/core.pingcap.com_pdgroups.yaml b/manifests/crd/core.pingcap.com_pdgroups.yaml index 1b35bc1108..ad1e2ab7fc 100644 --- a/manifests/crd/core.pingcap.com_pdgroups.yaml +++ b/manifests/crd/core.pingcap.com_pdgroups.yaml @@ -8654,8 +8654,11 @@ spec: pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true storageClassName: - description: StorageClassName means the storage class - the volume used. + description: |- + StorageClassName means the storage class the volume used. + You can modify volumes' attributes by changing the StorageClass + when VolumeAttributesClass is not available. + Note that only newly created PV will use the new StorageClass. type: string required: - for diff --git a/manifests/crd/core.pingcap.com_pds.yaml b/manifests/crd/core.pingcap.com_pds.yaml index 118edf7b70..9f60e78a69 100644 --- a/manifests/crd/core.pingcap.com_pds.yaml +++ b/manifests/crd/core.pingcap.com_pds.yaml @@ -8435,8 +8435,11 @@ spec: pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true storageClassName: - description: StorageClassName means the storage class the volume - used. + description: |- + StorageClassName means the storage class the volume used. + You can modify volumes' attributes by changing the StorageClass + when VolumeAttributesClass is not available. + Note that only newly created PV will use the new StorageClass. type: string required: - for diff --git a/manifests/crd/core.pingcap.com_tidbgroups.yaml b/manifests/crd/core.pingcap.com_tidbgroups.yaml index bc2ad0f03a..011fa93975 100644 --- a/manifests/crd/core.pingcap.com_tidbgroups.yaml +++ b/manifests/crd/core.pingcap.com_tidbgroups.yaml @@ -8670,8 +8670,11 @@ spec: pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true storageClassName: - description: StorageClassName means the storage class - the volume used. + description: |- + StorageClassName means the storage class the volume used. + You can modify volumes' attributes by changing the StorageClass + when VolumeAttributesClass is not available. + Note that only newly created PV will use the new StorageClass. type: string required: - for diff --git a/manifests/crd/core.pingcap.com_tidbs.yaml b/manifests/crd/core.pingcap.com_tidbs.yaml index 2e5f31af44..078443948a 100644 --- a/manifests/crd/core.pingcap.com_tidbs.yaml +++ b/manifests/crd/core.pingcap.com_tidbs.yaml @@ -8423,8 +8423,11 @@ spec: pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true storageClassName: - description: StorageClassName means the storage class the volume - used. + description: |- + StorageClassName means the storage class the volume used. + You can modify volumes' attributes by changing the StorageClass + when VolumeAttributesClass is not available. + Note that only newly created PV will use the new StorageClass. type: string required: - for diff --git a/manifests/crd/core.pingcap.com_tiflashes.yaml b/manifests/crd/core.pingcap.com_tiflashes.yaml index 7e2e37beff..0bfa3392c7 100644 --- a/manifests/crd/core.pingcap.com_tiflashes.yaml +++ b/manifests/crd/core.pingcap.com_tiflashes.yaml @@ -8451,8 +8451,11 @@ spec: pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true storageClassName: - description: StorageClassName means the storage class the volume - used. + description: |- + StorageClassName means the storage class the volume used. + You can modify volumes' attributes by changing the StorageClass + when VolumeAttributesClass is not available. + Note that only newly created PV will use the new StorageClass. type: string required: - for diff --git a/manifests/crd/core.pingcap.com_tiflashgroups.yaml b/manifests/crd/core.pingcap.com_tiflashgroups.yaml index a80a1909c9..65167b8d68 100644 --- a/manifests/crd/core.pingcap.com_tiflashgroups.yaml +++ b/manifests/crd/core.pingcap.com_tiflashgroups.yaml @@ -8658,8 +8658,11 @@ spec: pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true storageClassName: - description: StorageClassName means the storage class - the volume used. + description: |- + StorageClassName means the storage class the volume used. + You can modify volumes' attributes by changing the StorageClass + when VolumeAttributesClass is not available. + Note that only newly created PV will use the new StorageClass. type: string required: - for diff --git a/manifests/crd/core.pingcap.com_tikvgroups.yaml b/manifests/crd/core.pingcap.com_tikvgroups.yaml index ded3bade69..b3c0ca3a89 100644 --- a/manifests/crd/core.pingcap.com_tikvgroups.yaml +++ b/manifests/crd/core.pingcap.com_tikvgroups.yaml @@ -8643,8 +8643,11 @@ spec: pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true storageClassName: - description: StorageClassName means the storage class - the volume used. + description: |- + StorageClassName means the storage class the volume used. + You can modify volumes' attributes by changing the StorageClass + when VolumeAttributesClass is not available. + Note that only newly created PV will use the new StorageClass. type: string required: - for diff --git a/manifests/crd/core.pingcap.com_tikvs.yaml b/manifests/crd/core.pingcap.com_tikvs.yaml index ef3a9d33dd..04cc0c1988 100644 --- a/manifests/crd/core.pingcap.com_tikvs.yaml +++ b/manifests/crd/core.pingcap.com_tikvs.yaml @@ -8430,8 +8430,11 @@ spec: pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true storageClassName: - description: StorageClassName means the storage class the volume - used. + description: |- + StorageClassName means the storage class the volume used. + You can modify volumes' attributes by changing the StorageClass + when VolumeAttributesClass is not available. + Note that only newly created PV will use the new StorageClass. type: string required: - for diff --git a/manifests/rbac/role.yaml b/manifests/rbac/role.yaml index c5a213cd83..aaa89cf179 100644 --- a/manifests/rbac/role.yaml +++ b/manifests/rbac/role.yaml @@ -48,6 +48,7 @@ rules: - "" resources: - nodes + - persistentvolumes - secrets verbs: - get @@ -115,3 +116,11 @@ rules: - patch - update - watch +- apiGroups: + - storage.k8s.io + resources: + - storageclasses + verbs: + - get + - list + - watch diff --git a/pkg/client/fake.go b/pkg/client/fake.go index 6ed28957b9..f14bae20a7 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -286,7 +286,7 @@ func (c *fakeUnderlayClient) Get(ctx context.Context, key client.ObjectKey, obj if namespaced { action = testing.NewGetAction(mapping.Resource, key.Namespace, key.Name) } else { - action = testing.NewRootGetAction(mapping.Resource, key.Namespace) + action = testing.NewRootGetAction(mapping.Resource, key.Name) } newObj, err := c.Invokes(action, nil) diff --git a/pkg/controllers/tikv/tasks/ctx.go b/pkg/controllers/tikv/tasks/ctx.go index b56d6d63e2..2a68f5756f 100644 --- a/pkg/controllers/tikv/tasks/ctx.go +++ b/pkg/controllers/tikv/tasks/ctx.go @@ -4,6 +4,7 @@ import ( "context" "strconv" + "github.com/aws/aws-sdk-go-v2/config" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -15,6 +16,8 @@ import ( pdv1 "github.com/pingcap/tidb-operator/pkg/timanager/apis/pd/v1" pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd" "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/volumes" + "github.com/pingcap/tidb-operator/pkg/volumes/cloud/aws" ) type ReconcileContext struct { @@ -44,6 +47,8 @@ type ReconcileContext struct { // Since the config map's name is the same as the tidb's name, // we use this variable to decide whether to recreate pods. ConfigChanged bool + + VolumeModifier volumes.Modifier } func (ctx *ReconcileContext) Self() *ReconcileContext { @@ -123,6 +128,13 @@ func (t *TaskContext) Sync(ctx task.Context[ReconcileContext]) task.Result { return task.Complete().With("tikv is suspended") } + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + t.Logger.Error(err, "failed to load aws config") + } else { + rtx.VolumeModifier = volumes.NewRawModifier(aws.NewEBSModifier(cfg, t.Logger), t.Client, t.Logger) + } + c, ok := t.PDClientManager.Get(pdm.PrimaryKey(cluster.Namespace, cluster.Name)) if !ok { return task.Fail().With("pd client is not registered") diff --git a/pkg/controllers/tikv/tasks/pvc.go b/pkg/controllers/tikv/tasks/pvc.go index 578eca4a5e..eac571e870 100644 --- a/pkg/controllers/tikv/tasks/pvc.go +++ b/pkg/controllers/tikv/tasks/pvc.go @@ -31,23 +31,57 @@ func (t *TaskPVC) Sync(ctx task.Context[ReconcileContext]) task.Result { rtx := ctx.Self() if rtx.Cluster.ShouldSuspendCompute() { - return task.Complete().With("skip pvc for suspension") + return task.Complete().With("skip expectPVC for suspension") } pvcs := newPVCs(rtx.TiKV) - for _, pvc := range pvcs { - if err := t.Client.Apply(rtx, pvc); err != nil { - return task.Fail().With("can't apply pvc %s of pd: %v", pvc.Name, err) + for _, expectPVC := range pvcs { + var actualPVC corev1.PersistentVolumeClaim + if err := t.Client.Get(rtx, client.ObjectKey{ + Namespace: expectPVC.Namespace, + Name: expectPVC.Name, + }, &actualPVC); err != nil { + if client.IgnoreNotFound(err) != nil { + return task.Fail().With("can't get expectPVC %s/%s of tikv: %v", expectPVC.Namespace, expectPVC.Name, err) + } + + // Create PVC + if e := t.Client.Apply(rtx, expectPVC); e != nil { + return task.Fail().With("can't create expectPVC %s/%s of tikv: %v", expectPVC.Namespace, expectPVC.Name, e) + } + continue } - } - // TODO: check config updation + // Set default storage class name if it's not specified and the claim is bound. + if expectPVC.Spec.StorageClassName == nil && actualPVC.Status.Phase == corev1.ClaimBound { + expectPVC.Spec.StorageClassName = actualPVC.Spec.StorageClassName + } + vol, err := rtx.VolumeModifier.GetActualVolume(ctx, expectPVC, &actualPVC) + if err != nil { + return task.Fail().Continue().With("failed to get actual volume %s/%s: %v", expectPVC.Namespace, expectPVC.Name, err) + } + if rtx.VolumeModifier.ShouldModify(ctx, vol) { + t.Logger.Info("modifying volume's attributes", "expectPVC", expectPVC.Name) + if e := rtx.VolumeModifier.Modify(ctx, vol); e != nil { + return task.Fail().Continue().With("failed to modify volume's attributes %s/%s: %v", expectPVC.Namespace, expectPVC.Name, e) + } + } else { + if expectPVC.Spec.StorageClassName != nil && actualPVC.Spec.StorageClassName != nil && *expectPVC.Spec.StorageClassName != *actualPVC.Spec.StorageClassName { + // Avoid updating the storage class name as it's immutable. + expectPVC.Spec.StorageClassName = actualPVC.Spec.StorageClassName + } + + if err := t.Client.Apply(rtx, expectPVC); err != nil { + return task.Fail().With("can't update expectPVC %s/%s of tikv: %v", expectPVC.Namespace, expectPVC.Name, err) + } + } + } return task.Complete().With("pvcs are synced") } func newPVCs(tikv *v1alpha1.TiKV) []*corev1.PersistentVolumeClaim { - pvcs := []*corev1.PersistentVolumeClaim{} + var pvcs []*corev1.PersistentVolumeClaim for _, vol := range tikv.Spec.Volumes { pvcs = append(pvcs, &corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/utils/fake/fake.go b/pkg/utils/fake/fake.go index 7b8cdeaf02..9abd0a34ba 100644 --- a/pkg/utils/fake/fake.go +++ b/pkg/utils/fake/fake.go @@ -52,6 +52,18 @@ func Label[T any, PT Object[T]](k, v string) ChangeFunc[T, PT] { } } +func Annotation[T any, PT Object[T]](k, v string) ChangeFunc[T, PT] { + return func(obj PT) PT { + a := obj.GetAnnotations() + if a == nil { + a = map[string]string{} + } + a[k] = v + obj.SetAnnotations(a) + return obj + } +} + func SetDeleteTimestamp[T any, PT Object[T]]() ChangeFunc[T, PT] { return func(obj PT) PT { now := metav1.Now() diff --git a/pkg/utils/time/clock.go b/pkg/utils/time/clock.go new file mode 100644 index 0000000000..59d71000e9 --- /dev/null +++ b/pkg/utils/time/clock.go @@ -0,0 +1,36 @@ +package time + +import "time" + +type Clock interface { + Now() time.Time + Since(time.Time) time.Duration +} + +var ( + _ Clock = &RealClock{} + _ Clock = &FakeClock{} +) + +type RealClock struct{} + +func (RealClock) Since(t time.Time) time.Duration { + return time.Since(t) +} + +func (RealClock) Now() time.Time { + return time.Now() +} + +type FakeClock struct { + NowFunc func() time.Time + SinceFunc func(time.Time) time.Duration +} + +func (f FakeClock) Now() time.Time { + return f.NowFunc() +} + +func (f FakeClock) Since(t time.Time) time.Duration { + return f.SinceFunc(t) +} diff --git a/pkg/volumes/cloud/aws/ebs_modifier.go b/pkg/volumes/cloud/aws/ebs_modifier.go new file mode 100644 index 0000000000..c0cd5365b8 --- /dev/null +++ b/pkg/volumes/cloud/aws/ebs_modifier.go @@ -0,0 +1,330 @@ +package aws + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/aws/smithy-go" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/utils/ptr" + + "github.com/pingcap/tidb-operator/pkg/volumes/cloud" +) + +var ( + // defaultWaitDuration is the cooldown period for EBS ModifyVolume. + // Multiple ModifyVolume calls for the same volume within a 6-hour period will fail. + defaultWaitDuration = time.Hour * 6 + + // maxStorageSizeInGiB is the maximum size of EBS volume in GiB. + // Ref: https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_ModifyVolume.html#API_ModifyVolume_RequestParameters + maxStorageSizeInGiB = map[types.VolumeType]int{ + types.VolumeTypeGp2: 16384, + types.VolumeTypeGp3: 16384, + types.VolumeTypeIo1: 16384, + types.VolumeTypeIo2: 65536, + types.VolumeTypeSc1: 16384, + types.VolumeTypeSt1: 16384, + types.VolumeTypeStandard: 1024, + } + minStorageSizeInGiB = map[types.VolumeType]int{ + types.VolumeTypeGp2: 1, + types.VolumeTypeGp3: 1, + types.VolumeTypeIo1: 4, + types.VolumeTypeIo2: 4, + types.VolumeTypeSc1: 125, + types.VolumeTypeSt1: 125, + types.VolumeTypeStandard: 1, + } + + maxIOPS = map[types.VolumeType]int{ + types.VolumeTypeGp3: 16000, + types.VolumeTypeIo1: 64000, + types.VolumeTypeIo2: 256000, + } + minIOPS = map[types.VolumeType]int{ + types.VolumeTypeGp3: 3000, + types.VolumeTypeIo1: 100, + types.VolumeTypeIo2: 100, + } + + maxThroughput = 1000 + minThroughput = 125 +) + +const ( + paramKeyThroughput = "throughput" + paramKeyIOPS = "iops" + paramKeyType = "type" + + errCodeNotFound = "InvalidVolumeModification.NotFound" +) + +type EC2VolumeAPI interface { + ModifyVolume(ctx context.Context, param *ec2.ModifyVolumeInput, optFns ...func(*ec2.Options)) (*ec2.ModifyVolumeOutput, error) + DescribeVolumesModifications(ctx context.Context, param *ec2.DescribeVolumesModificationsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeVolumesModificationsOutput, error) +} + +type EBSModifier struct { + cli EC2VolumeAPI + logger logr.Logger +} + +type Volume struct { + VolumeId string + Size *int32 + IOPS *int32 + Throughput *int32 + Type types.VolumeType + + IsCompleted bool + IsFailed bool +} + +func NewEBSModifier(cfg aws.Config, logger logr.Logger) cloud.VolumeModifier { + return &EBSModifier{ + cli: ec2.NewFromConfig(cfg), + logger: logger, + } +} + +func (m *EBSModifier) Name() string { + return "ebs.csi.aws.com" +} + +func (m *EBSModifier) Validate(_, _ *corev1.PersistentVolumeClaim, ssc, dsc *storagev1.StorageClass) error { + if ssc != nil && dsc != nil { + if ssc.Provisioner != dsc.Provisioner { + return fmt.Errorf("provisioner should not be changed, now from %s to %s", ssc.Provisioner, dsc.Provisioner) + } + if ssc.Provisioner != m.Name() { + return fmt.Errorf("provisioner should be %s, now is %s", m.Name(), ssc.Provisioner) + } + } else { + m.logger.Info("storage class is nil, skip validation") + } + + return nil +} + +func (m *EBSModifier) Modify(ctx context.Context, pvc *corev1.PersistentVolumeClaim, pv *corev1.PersistentVolume, sc *storagev1.StorageClass) ( /*wait*/ bool, error) { + if pv == nil { + m.logger.Info("Persistent volume is nil, skip modifying PV. This may be caused by no relevant permissions", "pv", pvc.Spec.VolumeName) + return false, nil + } + + desired, err := m.getExpectedVolume(pvc, pv, sc) + if err != nil { + return false, err + } + + actual, err := m.getCurrentVolumeStatus(ctx, desired.VolumeId) + if err != nil { + return false, err + } + + if actual != nil { + // current one is matched with the desired + if !m.diffVolume(actual, desired) { + if actual.IsCompleted { + return false, nil + } + if !actual.IsFailed { + return true, nil + } + } + } + + m.logger.Info("call aws api to modify volume for pvc", "pvc_namespace", pvc.Namespace, "pvc_name", pvc.Name) + + // retry to modify the volume + if _, err := m.cli.ModifyVolume(ctx, &ec2.ModifyVolumeInput{ + VolumeId: &desired.VolumeId, + Size: desired.Size, + Iops: desired.IOPS, + Throughput: desired.Throughput, + VolumeType: desired.Type, + }); err != nil { + return false, err + } + + return true, nil +} + +// If some params are not set, assume they are equal. +func (m *EBSModifier) diffVolume(actual, desired *Volume) bool { + if diffInt32(actual.IOPS, desired.IOPS) { + return true + } + if diffInt32(actual.Throughput, desired.Throughput) { + return true + } + if diffInt32(actual.Size, desired.Size) { + return true + } + if actual.Type == "" || desired.Type == "" { + return false + } + if actual.Type != desired.Type { + return true + } + + return false +} + +func diffInt32(a, b *int32) bool { + if a == nil || b == nil { + return false + } + + if *a == *b { + return false + } + + return true +} + +func (m *EBSModifier) getCurrentVolumeStatus(ctx context.Context, id string) (*Volume, error) { + res, err := m.cli.DescribeVolumesModifications(ctx, &ec2.DescribeVolumesModificationsInput{ + VolumeIds: []string{id}, + }) + if err != nil { + var ae smithy.APIError + if errors.As(err, &ae) { + if ae.ErrorCode() == errCodeNotFound { + return nil, nil + } + } + return nil, err + } + + // TODO: maybe cool down time should also be returned to avoid + // recalling ModifyVolume too many times + for _, s := range res.VolumesModifications { + if s.VolumeId == nil || *s.VolumeId != id { + continue + } + v := Volume{ + VolumeId: *s.VolumeId, + Size: s.TargetSize, + IOPS: s.TargetIops, + Throughput: s.TargetThroughput, + Type: s.TargetVolumeType, + } + switch s.ModificationState { + case types.VolumeModificationStateCompleted: + v.IsCompleted = true + case types.VolumeModificationStateFailed: + v.IsFailed = true + case types.VolumeModificationStateModifying: + case types.VolumeModificationStateOptimizing: + v.IsCompleted = true + } + + return &v, nil + } + + return nil, nil +} + +func (m *EBSModifier) getExpectedVolume(pvc *corev1.PersistentVolumeClaim, pv *corev1.PersistentVolume, sc *storagev1.StorageClass) (*Volume, error) { + // get storage size in GiB from PVC + quantity := pvc.Spec.Resources.Requests[corev1.ResourceStorage] + sizeBytes := quantity.ScaledValue(0) + size := sizeBytes / 1024 / 1024 / 1024 + + v := &Volume{} + v.Size = ptr.To(int32(size)) + v.VolumeId = pv.Spec.CSI.VolumeHandle + if err := m.setArgsFromStorageClass(v, sc); err != nil { + return nil, err + } + + return v, validateVolume(v) +} + +func (m *EBSModifier) MinWaitDuration() time.Duration { + return defaultWaitDuration +} + +func (m *EBSModifier) setArgsFromStorageClass(v *Volume, sc *storagev1.StorageClass) error { + if sc == nil { + return nil + } + throughput, err := getParamInt32(sc.Parameters, paramKeyThroughput) + if err != nil { + return err + } + v.Throughput = throughput + + iops, err := getParamInt32(sc.Parameters, paramKeyIOPS) + if err != nil { + return err + } + v.IOPS = iops + + typ := sc.Parameters[paramKeyType] + v.Type = types.VolumeType(typ) + + return nil +} + +func getParamInt32(params map[string]string, key string) (*int32, error) { + str, ok := params[key] + if !ok { + return nil, nil + } + if str == "" { + return nil, nil + } + param, err := strconv.ParseInt(str, 10, 32) + if err != nil { + return nil, fmt.Errorf("can't parse %v param in storage class: %v", key, err) + } + + return ptr.To(int32(param)), nil +} + +func validateVolume(desired *Volume) error { + if desired == nil || string(desired.Type) == "" { + return nil + } + + if desired.Size != nil { + size := int(*desired.Size) + minSize, ok1 := minStorageSizeInGiB[desired.Type] + maxSize, ok2 := maxStorageSizeInGiB[desired.Type] + if ok1 && ok2 && (size < minSize || size > maxSize) { + return fmt.Errorf("size %d is out of range [%d, %d] for volume type %s", size, minSize, maxSize, desired.Type) + } + } + + if desired.IOPS != nil { + iops := int(*desired.IOPS) + minIops, ok1 := minIOPS[desired.Type] + maxIops, ok2 := maxIOPS[desired.Type] + if !ok1 || !ok2 { + return fmt.Errorf("modifying IOPS for volume type %s is not supported", desired.Type) + } + if iops < minIops || iops > maxIops { + return fmt.Errorf("iops %d is out of range [%d, %d] for volume type %s", iops, minIops, maxIops, desired.Type) + } + } + + if desired.Throughput != nil { + throughput := int(*desired.Throughput) + if throughput < minThroughput || throughput > maxThroughput { + return fmt.Errorf("throughput %d is out of range [%d, %d]", throughput, minThroughput, maxThroughput) + } + } + + return nil +} diff --git a/pkg/volumes/cloud/aws/ebs_modifier_test.go b/pkg/volumes/cloud/aws/ebs_modifier_test.go new file mode 100644 index 0000000000..42a8cebc54 --- /dev/null +++ b/pkg/volumes/cloud/aws/ebs_modifier_test.go @@ -0,0 +1,273 @@ +package aws + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/go-logr/logr" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/utils/ptr" +) + +func newTestPVC(size string) *corev1.PersistentVolumeClaim { + return &corev1.PersistentVolumeClaim{ + Spec: corev1.PersistentVolumeClaimSpec{ + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse(size), + }, + }, + }, + } +} + +func newTestPV(volId string) *corev1.PersistentVolume { + return &corev1.PersistentVolume{ + Spec: corev1.PersistentVolumeSpec{ + PersistentVolumeSource: corev1.PersistentVolumeSource{ + CSI: &corev1.CSIPersistentVolumeSource{ + VolumeHandle: volId, + }, + }, + }, + } +} + +func newTestStorageClass(provisioner, typ, iops, throughput string) *storagev1.StorageClass { + return &storagev1.StorageClass{ + Provisioner: provisioner, + Parameters: map[string]string{ + paramKeyIOPS: iops, + paramKeyType: typ, + paramKeyThroughput: throughput, + }, + } +} + +func TestModifyVolume(t *testing.T) { + initialPVC := newTestPVC("10Gi") + initialPV := newTestPV("aaa") + initialSC := newTestStorageClass("", "gp3", "3000", "125") + + cases := []struct { + desc string + + pvc *corev1.PersistentVolumeClaim + pv *corev1.PersistentVolume + sc *storagev1.StorageClass + + getState GetVolumeStateFunc + + wait bool + hasErr bool + }{ + { + desc: "volume modification is failed, modify again", + pvc: initialPVC, + pv: initialPV, + sc: initialSC, + + getState: func(id string) types.VolumeModificationState { + return types.VolumeModificationStateFailed + }, + + wait: true, + hasErr: false, + }, + { + desc: "volume modification is optimizing, no need to wait to avoid waiting too long time", + pvc: initialPVC, + pv: initialPV, + sc: initialSC, + + getState: func(id string) types.VolumeModificationState { + return types.VolumeModificationStateOptimizing + }, + + wait: false, + hasErr: false, + }, + { + desc: "volume modification is completed, no need to wait", + pvc: initialPVC, + pv: initialPV, + sc: initialSC, + + getState: func(id string) types.VolumeModificationState { + return types.VolumeModificationStateCompleted + }, + + wait: false, + hasErr: false, + }, + { + desc: "volume modification is modifying, wait", + pvc: initialPVC, + pv: initialPV, + sc: initialSC, + + getState: func(id string) types.VolumeModificationState { + return types.VolumeModificationStateModifying + }, + + wait: true, + hasErr: false, + }, + { + desc: "volume has been modified, but size is changed", + pvc: newTestPVC("20Gi"), + pv: initialPV, + sc: initialSC, + + getState: func(id string) types.VolumeModificationState { + return types.VolumeModificationStateCompleted + }, + + wait: true, + hasErr: false, + }, + { + desc: "volume has been modified, but sc is changed", + pvc: initialPVC, + pv: initialPV, + sc: newTestStorageClass("", "io2", "3000", "300"), + + getState: func(id string) types.VolumeModificationState { + return types.VolumeModificationStateCompleted + }, + + wait: true, + hasErr: false, + }, + { + desc: "volume is modifying, but size/sc is changed", + pvc: newTestPVC("20Gi"), + pv: initialPV, + sc: newTestStorageClass("", "gp2", "3000", "300"), + + getState: func(id string) types.VolumeModificationState { + return types.VolumeModificationStateModifying + }, + + wait: false, + hasErr: true, + }, + } + + for _, tt := range cases { + t.Run(tt.desc, func(t *testing.T) { + g := NewGomegaWithT(t) + m := NewFakeEBSModifier(tt.getState) + + wait1, err := m.Modify(context.TODO(), initialPVC, initialPV, initialSC) + g.Expect(err).Should(Succeed(), tt.desc) + g.Expect(wait1).Should(BeTrue(), tt.desc) + + wait2, err := m.Modify(context.TODO(), tt.pvc, tt.pv, tt.sc) + if tt.hasErr { + g.Expect(err).Should(HaveOccurred(), tt.desc) + } else { + g.Expect(err).Should(Succeed(), tt.desc) + } + g.Expect(wait2).Should(Equal(tt.wait), tt.desc) + }) + } +} + +func TestValidate(t *testing.T) { + tests := []struct { + name string + ssc *storagev1.StorageClass + dsc *storagev1.StorageClass + wantErr bool + }{ + { + name: "same provisioner, but not ebs", + ssc: newTestStorageClass("foo", "gp3", "1000", "100"), + dsc: newTestStorageClass("foo", "gp3", "2000", "200"), + wantErr: true, + }, + { + name: "different provisioner", + ssc: newTestStorageClass("foo", "gp3", "1000", "100"), + dsc: newTestStorageClass("bar", "gp3", "2000", "200"), + wantErr: true, + }, + { + name: "happy path", + ssc: newTestStorageClass("ebs.csi.aws.com", "gp3", "1000", "100"), + dsc: newTestStorageClass("ebs.csi.aws.com", "gp3", "2000", "200"), + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &EBSModifier{logger: logr.Logger{}} + if err := m.Validate(nil, nil, tt.ssc, tt.dsc); (err != nil) != tt.wantErr { + t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_validateVolume(t *testing.T) { + tests := []struct { + name string + desired *Volume + wantErr bool + }{ + { + name: "empty volume type", + desired: &Volume{}, + }, + { + name: "nil volume", + desired: &Volume{}, + }, + { + name: "valid volume", + desired: &Volume{ + Size: ptr.To(int32(10)), + IOPS: ptr.To(int32(3000)), + Throughput: ptr.To(int32(125)), + Type: types.VolumeTypeGp3, + }, + wantErr: false, + }, + { + name: "invalid size", + desired: &Volume{ + Size: ptr.To(int32(100000)), + Type: types.VolumeTypeGp3, + }, + wantErr: true, + }, + { + name: "invalid IOPS", + desired: &Volume{ + IOPS: ptr.To(int32(20000)), + Type: types.VolumeTypeGp3, + }, + wantErr: true, + }, + { + name: "invalid throughput", + desired: &Volume{ + Throughput: ptr.To(int32(2000)), + Type: types.VolumeTypeGp3, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := validateVolume(tt.desired); (err != nil) != tt.wantErr { + t.Errorf("validateVolume() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/volumes/cloud/aws/fake.go b/pkg/volumes/cloud/aws/fake.go new file mode 100644 index 0000000000..fdda39ec2f --- /dev/null +++ b/pkg/volumes/cloud/aws/fake.go @@ -0,0 +1,101 @@ +package aws + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/aws/smithy-go" + + "github.com/pingcap/tidb-operator/pkg/volumes/cloud" +) + +func NewFakeEBSModifier(f GetVolumeStateFunc) cloud.VolumeModifier { + return &EBSModifier{ + cli: NewFakeEC2VolumeAPI(f), + } +} + +type GetVolumeStateFunc func(id string) types.VolumeModificationState + +type FakeEC2VolumeAPI struct { + vs []Volume + f GetVolumeStateFunc +} + +func NewFakeEC2VolumeAPI(f GetVolumeStateFunc) *FakeEC2VolumeAPI { + m := &FakeEC2VolumeAPI{ + f: f, + } + + return m +} + +func (m *FakeEC2VolumeAPI) ModifyVolume(ctx context.Context, param *ec2.ModifyVolumeInput, optFns ...func(*ec2.Options)) (*ec2.ModifyVolumeOutput, error) { + for i := range m.vs { + v := &m.vs[i] + if v.VolumeId == *param.VolumeId { + state := m.f(v.VolumeId) + switch state { + // NOTE(liubo02): I'm not sure the behavior to recall the aws api when the last modification + // is in some states + case types.VolumeModificationStateCompleted, types.VolumeModificationStateFailed: + m.vs[i] = Volume{ + VolumeId: *param.VolumeId, + Size: param.Size, + IOPS: param.Iops, + Throughput: param.Throughput, + Type: param.VolumeType, + } + + return &ec2.ModifyVolumeOutput{}, nil + } + + return nil, fmt.Errorf("volume %s has been modified or modification is not finished", v.VolumeId) + } + } + + v := Volume{ + VolumeId: *param.VolumeId, + Size: param.Size, + IOPS: param.Iops, + Throughput: param.Throughput, + Type: param.VolumeType, + } + + m.vs = append(m.vs, v) + + return &ec2.ModifyVolumeOutput{}, nil +} + +func (m *FakeEC2VolumeAPI) DescribeVolumesModifications(ctx context.Context, param *ec2.DescribeVolumesModificationsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeVolumesModificationsOutput, error) { + var mods []types.VolumeModification + for _, id := range param.VolumeIds { + for i := range m.vs { + v := m.vs[i] + if v.VolumeId != id { + continue + } + + mods = append(mods, types.VolumeModification{ + VolumeId: &v.VolumeId, + TargetIops: v.IOPS, + TargetSize: v.Size, + TargetThroughput: v.Throughput, + TargetVolumeType: v.Type, + ModificationState: m.f(id), + }) + } + } + + if len(mods) == 0 { + return nil, &smithy.GenericAPIError{ + Code: errCodeNotFound, + } + } + + return &ec2.DescribeVolumesModificationsOutput{ + VolumesModifications: mods, + }, nil +} diff --git a/pkg/volumes/cloud/interface.go b/pkg/volumes/cloud/interface.go new file mode 100644 index 0000000000..faa579cc49 --- /dev/null +++ b/pkg/volumes/cloud/interface.go @@ -0,0 +1,47 @@ +package cloud + +import ( + "context" + "time" + + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" +) + +type VolumeModifier interface { + // Name returns the name of the volume modifier. + Name() string + + // Modify modifies the underlay volume of pvc to match the args of storageclass. + // If no PV permission (e.g `-cluster-permission-pv=false`), the `pv` may be nil and will return `false, nil`. + Modify(ctx context.Context, pvc *corev1.PersistentVolumeClaim, pv *corev1.PersistentVolume, sc *storagev1.StorageClass) (bool, error) + + MinWaitDuration() time.Duration + + Validate(spvc, dpvc *corev1.PersistentVolumeClaim, ssc, dsc *storagev1.StorageClass) error +} + +// FakeVolumeModifier is a fake implementation of the VolumeModifier interface for unit testing. +type FakeVolumeModifier struct { + name string + modifyResult bool + modifyError error + minWait time.Duration + validateError error +} + +func (f *FakeVolumeModifier) Name() string { + return f.name +} + +func (f *FakeVolumeModifier) Modify(ctx context.Context, pvc *corev1.PersistentVolumeClaim, pv *corev1.PersistentVolume, sc *storagev1.StorageClass) (bool, error) { + return f.modifyResult, f.modifyError +} + +func (f *FakeVolumeModifier) MinWaitDuration() time.Duration { + return f.minWait +} + +func (f *FakeVolumeModifier) Validate(spvc, dpvc *corev1.PersistentVolumeClaim, ssc, dsc *storagev1.StorageClass) error { + return f.validateError +} diff --git a/pkg/volumes/raw_modifier.go b/pkg/volumes/raw_modifier.go new file mode 100644 index 0000000000..d08585693b --- /dev/null +++ b/pkg/volumes/raw_modifier.go @@ -0,0 +1,257 @@ +package volumes + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + + "github.com/pingcap/tidb-operator/pkg/client" + timeutils "github.com/pingcap/tidb-operator/pkg/utils/time" + "github.com/pingcap/tidb-operator/pkg/volumes/cloud" +) + +var _ Modifier = &rawModifier{} + +// rawModifier modifies volumes by calling cloud provider API. +type rawModifier struct { + k8sClient client.Client + logger logr.Logger + volumeModifier cloud.VolumeModifier + clock timeutils.Clock +} + +func NewRawModifier(modifier cloud.VolumeModifier, k8sClient client.Client, logger logr.Logger) Modifier { + return &rawModifier{ + k8sClient: k8sClient, + logger: logger, + clock: &timeutils.RealClock{}, + volumeModifier: modifier, + } +} + +func (m *rawModifier) GetActualVolume(ctx context.Context, expect, current *corev1.PersistentVolumeClaim) (*ActualVolume, error) { + pv, err := getBoundPVFromPVC(ctx, m.k8sClient, current) + if err != nil { + return nil, fmt.Errorf("failed to get bound PV from PVC %s/%s: %w", current.Namespace, current.Name, err) + } + + curSC, err := getStorageClassFromPVC(ctx, m.k8sClient, current) + if err != nil { + return nil, fmt.Errorf("failed to get StorageClass from PVC %s/%s: %w", current.Namespace, current.Name, err) + } + + desired := &DesiredVolume{ + Size: getStorageSize(expect.Spec.Resources.Requests), + StorageClassName: expect.Spec.StorageClassName, + } + if desired.StorageClassName != nil { + var sc storagev1.StorageClass + if err = m.k8sClient.Get(ctx, client.ObjectKey{Name: *desired.StorageClassName}, &sc); err != nil { + return nil, fmt.Errorf("failed to get StorageClass %s: %w", *desired.StorageClassName, err) + } + desired.StorageClass = &sc + } + + actual := ActualVolume{ + Desired: desired, + PVC: current, + PV: pv, + StorageClass: curSC, + } + return &actual, nil +} + +func (m *rawModifier) ShouldModify(_ context.Context, actual *ActualVolume) bool { + actual.Phase = m.getVolumePhase(actual) + return actual.Phase == VolumePhasePreparing || actual.Phase == VolumePhaseModifying +} + +func (m *rawModifier) Modify(ctx context.Context, vol *ActualVolume) error { + m.logger.Info(fmt.Sprintf("try to sync volume %s/%s, phase: %s", vol.PVC.Namespace, vol.PVC.Name, vol.Phase)) + + switch vol.Phase { + case VolumePhasePreparing: + if err := m.modifyPVCAnnoSpec(ctx, vol); err != nil { + return err + } + + fallthrough + case VolumePhaseModifying: + pvc := vol.PVC.DeepCopy() + pvc.Spec.Resources.Requests[corev1.ResourceStorage] = vol.Desired.Size + wait, err := m.volumeModifier.Modify(ctx, pvc, vol.PV, vol.Desired.StorageClass) + if err != nil { + return err + } + if wait { + return fmt.Errorf("wait for volume %s/%s modification completed", vol.PVC.Namespace, vol.PVC.Name) + } + + // try to resize fs + synced, err := m.syncPVCSize(ctx, vol) + if err != nil { + return err + } + if !synced { + return fmt.Errorf("wait for fs resize completed") + } + if err = m.modifyPVCAnnoStatus(ctx, vol); err != nil { + return err + } + default: + return fmt.Errorf("volume %s/%s is in phase %s, cannot modify", vol.PVC.Namespace, vol.PVC.Name, vol.Phase) + } + return nil +} + +func (m *rawModifier) modifyPVCAnnoStatus(ctx context.Context, vol *ActualVolume) error { + pvc := vol.PVC.DeepCopy() + + if pvc.Annotations == nil { + pvc.Annotations = map[string]string{} + } + + pvc.Annotations[annoKeyPVCStatusRevision] = pvc.Annotations[annoKeyPVCSpecRevision] + if scName := pvc.Annotations[annoKeyPVCSpecStorageClass]; scName != "" { + pvc.Annotations[annoKeyPVCStatusStorageClass] = scName + } + pvc.Annotations[annoKeyPVCStatusStorageSize] = pvc.Annotations[annoKeyPVCSpecStorageSize] + + err := m.k8sClient.Update(ctx, pvc) + if err != nil { + return err + } + + vol.PVC = pvc + return nil +} + +func (m *rawModifier) getVolumePhase(vol *ActualVolume) VolumePhase { + if err := m.validate(vol); err != nil { + m.logger.Info(fmt.Sprintf("volume %s/%s modification is not allowed: %v", vol.PVC.Namespace, vol.PVC.Name, err)) + return VolumePhaseCannotModify + } + if isPVCRevisionChanged(vol.PVC) { + return VolumePhaseModifying + } + + if !needModify(vol.PVC, vol.Desired) { + return VolumePhaseModified + } + + if m.waitForNextTime(vol.PVC) { + return VolumePhasePending + } + + return VolumePhasePreparing +} + +func (m *rawModifier) validate(vol *ActualVolume) error { + if vol.Desired == nil { + return fmt.Errorf("can't match desired volume") + } + desired := vol.Desired.GetStorageSize() + actual := vol.GetStorageSize() + result := desired.Cmp(actual) + if result < 0 { + return fmt.Errorf("can't shrunk size from %s to %s", &actual, &desired) + } + if result > 0 { + supported, err := isVolumeExpansionSupported(vol.StorageClass) + if err != nil { + m.logger.Info(fmt.Sprintf("volume expansion of storage class %s may be not supported, but it will be tried", vol.GetStorageClassName())) + } + if !supported { + return fmt.Errorf("volume expansion is not supported by storageclass %s", vol.StorageClass.Name) + } + } + + // if no pv permission but have sc permission: cannot change sc + if isStorageClassChanged(vol.GetStorageClassName(), vol.Desired.GetStorageClassName()) && vol.PV == nil { + return fmt.Errorf("cannot change storage class (%s to %s), because there is no permission to get persistent volume", vol.GetStorageClassName(), vol.Desired.GetStorageClassName()) + } + + desiredPVC := vol.PVC.DeepCopy() + desiredPVC.Spec.Resources.Requests[corev1.ResourceStorage] = desired + + return m.volumeModifier.Validate(vol.PVC, desiredPVC, vol.StorageClass, vol.Desired.StorageClass) +} + +func (m *rawModifier) waitForNextTime(pvc *corev1.PersistentVolumeClaim) bool { + str, ok := pvc.Annotations[annoKeyPVCLastTransitionTimestamp] + if !ok { + return false + } + timestamp, err := time.Parse(time.RFC3339, str) + if err != nil { + return false + } + + waitDur := defaultModifyWaitingDuration + if m != nil { + waitDur = m.volumeModifier.MinWaitDuration() + } + + if d := m.clock.Since(timestamp); d < waitDur { + m.logger.Info(fmt.Sprintf("volume %s/%s modification is pending, should wait %v", pvc.Namespace, pvc.Name, waitDur-d)) + return true + } + + return false +} + +func getStorageClassFromPVC(ctx context.Context, cli client.Client, pvc *corev1.PersistentVolumeClaim) (*storagev1.StorageClass, error) { + scName := getStorageClassNameFromPVC(pvc) + if scName == "" { + return nil, fmt.Errorf("StorageClass of pvc %s is not set", pvc.Name) + } + var sc storagev1.StorageClass + if err := cli.Get(ctx, client.ObjectKey{Name: scName}, &sc); err != nil { + return nil, fmt.Errorf("failed to get StorageClass %s: %w", scName, err) + } + return &sc, nil +} + +func (m *rawModifier) syncPVCSize(ctx context.Context, vol *ActualVolume) (bool, error) { + capacity := vol.PVC.Status.Capacity.Storage() + requestSize := vol.PVC.Spec.Resources.Requests.Storage() + if requestSize.Cmp(vol.Desired.Size) == 0 && capacity.Cmp(vol.Desired.Size) == 0 { + return true, nil + + } + + if requestSize.Cmp(vol.Desired.Size) == 0 { + return false, nil + } + + pvc := vol.PVC.DeepCopy() + pvc.Spec.Resources.Requests[corev1.ResourceStorage] = vol.Desired.Size + err := m.k8sClient.Update(ctx, pvc) + if err != nil { + return false, err + } + + vol.PVC = pvc + return false, nil +} + +func (m *rawModifier) modifyPVCAnnoSpec(ctx context.Context, vol *ActualVolume) error { + pvc := vol.PVC.DeepCopy() + size := vol.Desired.Size + scName := vol.Desired.GetStorageClassName() + + if isChanged := snapshotStorageClassAndSize(pvc, scName, size); isChanged { + upgradeRevision(pvc) + } + + setLastTransitionTimestamp(pvc) + if err := m.k8sClient.Update(ctx, pvc); err != nil { + return fmt.Errorf("failed to update PVC %s/%s: %w", pvc.Namespace, pvc.Name, err) + } + vol.PVC = pvc + return nil +} diff --git a/pkg/volumes/raw_modifier_test.go b/pkg/volumes/raw_modifier_test.go new file mode 100644 index 0000000000..b2a91dd777 --- /dev/null +++ b/pkg/volumes/raw_modifier_test.go @@ -0,0 +1,224 @@ +package volumes + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/go-logr/logr" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/utils/ptr" + + "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/utils/fake" + "github.com/pingcap/tidb-operator/pkg/utils/time" + "github.com/pingcap/tidb-operator/pkg/volumes/cloud" + "github.com/pingcap/tidb-operator/pkg/volumes/cloud/aws" +) + +func withPVCStatus(phase corev1.PersistentVolumeClaimPhase, size string) fake.ChangeFunc[corev1.PersistentVolumeClaim, *corev1.PersistentVolumeClaim] { + return func(pvc *corev1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim { + pvc.Status.Phase = phase + pvc.Status.Capacity = corev1.ResourceList{} + pvc.Status.Capacity[corev1.ResourceStorage] = resource.MustParse(size) + return pvc + } +} + +func withPVCSpec(scName *string, vol, size string) fake.ChangeFunc[corev1.PersistentVolumeClaim, *corev1.PersistentVolumeClaim] { + return func(pvc *corev1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim { + pvc.Spec.StorageClassName = scName + pvc.Spec.VolumeName = vol + pvc.Spec.Resources.Requests = corev1.ResourceList{} + pvc.Spec.Resources.Requests[corev1.ResourceStorage] = resource.MustParse(size) + return pvc + } +} + +func withParameters(params map[string]string) fake.ChangeFunc[storagev1.StorageClass, *storagev1.StorageClass] { + return func(sc *storagev1.StorageClass) *storagev1.StorageClass { + sc.Parameters = params + return sc + } +} + +func getObjectsFromActualVolume(vol *ActualVolume) []client.Object { + var objs []client.Object + if vol != nil { + if vol.Desired != nil && vol.Desired.StorageClass != nil { + objs = append(objs, vol.Desired.StorageClass) + } + if vol.StorageClass != nil { + objs = append(objs, vol.StorageClass) + } + if vol.PVC != nil { + objs = append(objs, vol.PVC) + } + if vol.PV != nil { + objs = append(objs, vol.PV) + } + } + return objs +} + +func Test_rawModifier_GetActualVolume(t *testing.T) { + tests := []struct { + name string + existingObjs []client.Object + desired *corev1.PersistentVolumeClaim + current *corev1.PersistentVolumeClaim + getState aws.GetVolumeStateFunc + expect func(*WithT, *ActualVolume) + wantErr bool + }{ + { + name: "happy path: no modification", + existingObjs: []client.Object{ + fake.FakeObj[corev1.PersistentVolume]("pv-0"), + fake.FakeObj[storagev1.StorageClass]("sc-0"), + }, + desired: fake.FakeObj[corev1.PersistentVolumeClaim]("pvc-0", withPVCSpec(ptr.To("sc-0"), "pv-0", "10Gi")), + current: fake.FakeObj[corev1.PersistentVolumeClaim]("pvc-0", withPVCStatus(corev1.ClaimBound, "10Gi"), withPVCSpec(ptr.To("sc-0"), "pv-0", "10Gi")), + getState: func(_ string) types.VolumeModificationState { + return types.VolumeModificationStateFailed + }, + expect: func(g *WithT, volume *ActualVolume) { + g.Expect(volume).ShouldNot(BeNil()) + g.Expect(volume.Desired).ShouldNot(BeNil()) + g.Expect(volume.Desired.Size).Should(Equal(resource.MustParse("10Gi"))) + g.Expect(volume.Desired.StorageClassName).Should(Equal(ptr.To("sc-0"))) + g.Expect(volume.Desired.StorageClass).ShouldNot(BeNil()) + + g.Expect(volume.PVC).ShouldNot(BeNil()) + g.Expect(volume.PV).ShouldNot(BeNil()) + g.Expect(volume.StorageClass).ShouldNot(BeNil()) + g.Expect(volume.Phase).Should(Equal(VolumePhaseUnknown)) + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := client.NewFakeClient(tt.existingObjs...) + m := NewRawModifier(aws.NewFakeEBSModifier(tt.getState), cli, logr.Discard()) + got, err := m.GetActualVolume(context.TODO(), tt.desired, tt.current) + if (err != nil) != tt.wantErr { + t.Errorf("GetActualVolume() error = %v, wantErr %v", err, tt.wantErr) + return + } + g := NewGomegaWithT(t) + if tt.expect != nil { + tt.expect(g, got) + } + }) + } +} + +func Test_rawModifier_getVolumePhase(t *testing.T) { + tests := []struct { + name string + volumeModifier cloud.VolumeModifier + clock time.Clock + vol *ActualVolume + want VolumePhase + }{ + { + name: "no need to modify", + vol: &ActualVolume{ + Desired: &DesiredVolume{ + Size: resource.MustParse("10Gi"), + StorageClassName: ptr.To("sc-0"), + }, + PVC: fake.FakeObj[corev1.PersistentVolumeClaim]("pvc-0", withPVCSpec(ptr.To("sc-0"), "pv-0", "10Gi"), withPVCStatus(corev1.ClaimBound, "10Gi")), + }, + volumeModifier: &cloud.FakeVolumeModifier{}, + want: VolumePhaseModified, + }, + { + name: "change storage class", + vol: &ActualVolume{ + Desired: &DesiredVolume{ + Size: resource.MustParse("10Gi"), + StorageClassName: ptr.To("sc-1"), + StorageClass: fake.FakeObj[storagev1.StorageClass]("sc-1", withParameters(map[string]string{"iops": "100"})), + }, + PVC: fake.FakeObj[corev1.PersistentVolumeClaim]("pvc-0", withPVCSpec(ptr.To("sc-0"), "pv-0", "10Gi"), withPVCStatus(corev1.ClaimBound, "10Gi")), + StorageClass: fake.FakeObj[storagev1.StorageClass]("sc-0"), + PV: fake.FakeObj[corev1.PersistentVolume]("pv-0"), + }, + volumeModifier: &cloud.FakeVolumeModifier{}, + want: VolumePhasePreparing, + }, + { + name: "increase size", + vol: &ActualVolume{ + Desired: &DesiredVolume{ + Size: resource.MustParse("100Gi"), + StorageClassName: ptr.To("sc-0"), + }, + PVC: fake.FakeObj[corev1.PersistentVolumeClaim]("pvc-0", withPVCSpec(ptr.To("sc-0"), "pv-0", "10Gi"), withPVCStatus(corev1.ClaimBound, "10Gi")), + }, + volumeModifier: &cloud.FakeVolumeModifier{}, + want: VolumePhasePreparing, + }, + { + name: "decrease size", + vol: &ActualVolume{ + Desired: &DesiredVolume{ + Size: resource.MustParse("1Gi"), + StorageClassName: ptr.To("sc-0"), + }, + PVC: fake.FakeObj[corev1.PersistentVolumeClaim]("pvc-0", withPVCSpec(ptr.To("sc-0"), "pv-0", "10Gi"), withPVCStatus(corev1.ClaimBound, "10Gi")), + }, + volumeModifier: &cloud.FakeVolumeModifier{}, + want: VolumePhaseCannotModify, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &rawModifier{ + k8sClient: client.NewFakeClient(getObjectsFromActualVolume(tt.vol)...), + logger: logr.Logger{}, + volumeModifier: tt.volumeModifier, + clock: tt.clock, + } + if got := m.getVolumePhase(tt.vol); got != tt.want { + t.Errorf("getVolumePhase() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_rawModifier_Modify(t *testing.T) { + tests := []struct { + name string + vol *ActualVolume + getState aws.GetVolumeStateFunc + wantErr bool + }{ + { + name: "can not modify", + vol: &ActualVolume{ + PVC: fake.FakeObj[corev1.PersistentVolumeClaim]("pvc-0"), + Phase: VolumePhaseModified, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &rawModifier{ + k8sClient: client.NewFakeClient(getObjectsFromActualVolume(tt.vol)...), + logger: logr.Discard(), + volumeModifier: aws.NewFakeEBSModifier(tt.getState), + clock: &time.RealClock{}, + } + if err := m.Modify(context.TODO(), tt.vol); (err != nil) != tt.wantErr { + t.Errorf("Modify() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/volumes/types.go b/pkg/volumes/types.go new file mode 100644 index 0000000000..608384a933 --- /dev/null +++ b/pkg/volumes/types.go @@ -0,0 +1,101 @@ +package volumes + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/resource" +) + +type Modifier interface { + GetActualVolume(ctx context.Context, expect, current *corev1.PersistentVolumeClaim) (*ActualVolume, error) + + ShouldModify(ctx context.Context, actual *ActualVolume) bool + + Modify(ctx context.Context, vol *ActualVolume) error +} + +type DesiredVolume struct { + Size resource.Quantity + // it may be nil if there is no permission to get storage class + StorageClass *storagev1.StorageClass + // it is sc name specified by user + // the sc may not exist + StorageClassName *string +} + +// GetStorageClassName may return empty because SC is unset or no permission to verify the existence of SC. +func (v *DesiredVolume) GetStorageClassName() string { + if v.StorageClassName == nil { + return "" + } + return *v.StorageClassName +} + +func (v *DesiredVolume) GetStorageSize() resource.Quantity { + return v.Size +} + +type ActualVolume struct { + Desired *DesiredVolume + PVC *corev1.PersistentVolumeClaim + Phase VolumePhase + // PV may be nil if there is no permission to get pvc + PV *corev1.PersistentVolume + // StorageClass may be nil if there is no permission to get storage class + StorageClass *storagev1.StorageClass +} + +func (v *ActualVolume) GetStorageClassName() string { + return getStorageClassNameFromPVC(v.PVC) +} + +func (v *ActualVolume) GetStorageSize() resource.Quantity { + return getStorageSize(v.PVC.Status.Capacity) +} + +type VolumePhase int + +const ( + VolumePhaseUnknown VolumePhase = iota + // VolumePhasePending will be set when: + // 1. isPVCRevisionChanged: false + // 2. needModify: true + // 3. waitForNextTime: true + VolumePhasePending + // VolumePhasePreparing will be set when: + // 1. isPVCRevisionChanged: false + // 2. needModify: true + // 3. waitForNextTime: false + VolumePhasePreparing + // VolumePhaseModifying will be set when: + // 1. isPVCRevisionChanged: true + // 2. needModify: true/false + // 3. waitForNextTime: true/false + VolumePhaseModifying + // VolumePhaseModified will be set when: + // 1. isPVCRevisionChanged: false + // 2. needModify: false + // 3. waitForNextTime: true/false + VolumePhaseModified + + VolumePhaseCannotModify +) + +func (p VolumePhase) String() string { + switch p { + case VolumePhasePending: + return "Pending" + case VolumePhasePreparing: + return "Preparing" + case VolumePhaseModifying: + return "Modifying" + case VolumePhaseModified: + return "Modified" + case VolumePhaseCannotModify: + return "CannotModify" + default: + return "Unknown" + } +} diff --git a/pkg/volumes/utils.go b/pkg/volumes/utils.go new file mode 100644 index 0000000000..ca7ca0f568 --- /dev/null +++ b/pkg/volumes/utils.go @@ -0,0 +1,188 @@ +package volumes + +import ( + "context" + "fmt" + "strconv" + "time" + + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + + "github.com/pingcap/tidb-operator/pkg/client" +) + +const ( + annoKeyPVCSpecRevision = "spec.tidb.pingcap.com/revision" + annoKeyPVCSpecStorageClass = "spec.tidb.pingcap.com/storage-class" + annoKeyPVCSpecStorageSize = "spec.tidb.pingcap.com/storage-size" + + annoKeyPVCStatusRevision = "status.tidb.pingcap.com/revision" + annoKeyPVCStatusStorageClass = "status.tidb.pingcap.com/storage-class" + annoKeyPVCStatusStorageSize = "status.tidb.pingcap.com/storage-size" + + annoKeyPVCLastTransitionTimestamp = "status.tidb.pingcap.com/last-transition-timestamp" + + defaultModifyWaitingDuration = time.Minute * 1 +) + +func getBoundPVFromPVC(ctx context.Context, cli client.Client, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolume, error) { + if pvc.Status.Phase != corev1.ClaimBound { + return nil, fmt.Errorf("pvc %s/%s is not bound", pvc.Namespace, pvc.Name) + } + + name := pvc.Spec.VolumeName + var pv corev1.PersistentVolume + if err := cli.Get(ctx, client.ObjectKey{Name: name}, &pv); err != nil { + return nil, fmt.Errorf("failed to get PV %s: %w", name, err) + } + + return &pv, nil +} + +func getStorageSize(r corev1.ResourceList) resource.Quantity { + return r[corev1.ResourceStorage] +} + +func ignoreNil(s *string) string { + if s == nil { + return "" + } + return *s +} + +func setLastTransitionTimestamp(pvc *corev1.PersistentVolumeClaim) { + if pvc.Annotations == nil { + pvc.Annotations = map[string]string{} + } + + pvc.Annotations[annoKeyPVCLastTransitionTimestamp] = metav1.Now().Format(time.RFC3339) +} + +func upgradeRevision(pvc *corev1.PersistentVolumeClaim) { + rev := 1 + str, ok := pvc.Annotations[annoKeyPVCSpecRevision] + if ok { + oldRev, err := strconv.Atoi(str) + if err != nil { + klog.Warningf("revision format err: %v, reset to 0", err) + oldRev = 0 + } + rev = oldRev + 1 + } + + if pvc.Annotations == nil { + pvc.Annotations = map[string]string{} + } + + pvc.Annotations[annoKeyPVCSpecRevision] = strconv.Itoa(rev) +} + +// isPVCSpecMatched checks if the storage class or storage size of the PVC is changed. +func isPVCSpecMatched(pvc *corev1.PersistentVolumeClaim, scName string, size resource.Quantity) bool { + isChanged := false + + oldSc := ignoreNil(pvc.Spec.StorageClassName) + scAnno, ok := pvc.Annotations[annoKeyPVCSpecStorageClass] + if ok && scAnno != "" { + oldSc = scAnno + } + + if scName != "" && oldSc != scName { + isChanged = true + } + + oldSize, ok := pvc.Annotations[annoKeyPVCSpecStorageSize] + if !ok { + quantity := getStorageSize(pvc.Spec.Resources.Requests) + oldSize = quantity.String() + } + if oldSize != size.String() { + isChanged = true + } + + return isChanged +} + +func snapshotStorageClassAndSize(pvc *corev1.PersistentVolumeClaim, scName string, size resource.Quantity) bool { + isChanged := isPVCSpecMatched(pvc, scName, size) + + if pvc.Annotations == nil { + pvc.Annotations = map[string]string{} + } + + if scName != "" { + pvc.Annotations[annoKeyPVCSpecStorageClass] = scName + } + pvc.Annotations[annoKeyPVCSpecStorageSize] = size.String() + + return isChanged +} + +func getStorageClassNameFromPVC(pvc *corev1.PersistentVolumeClaim) string { + sc := ignoreNil(pvc.Spec.StorageClassName) + + scAnno, ok := pvc.Annotations[annoKeyPVCStatusStorageClass] + if ok && scAnno != "" { + sc = scAnno + } + + return sc +} + +func needModify(pvc *corev1.PersistentVolumeClaim, desired *DesiredVolume) bool { + size := desired.Size + scName := desired.GetStorageClassName() + + return isPVCStatusMatched(pvc, scName, size) +} + +func isPVCStatusMatched(pvc *corev1.PersistentVolumeClaim, scName string, size resource.Quantity) bool { + oldSc := getStorageClassNameFromPVC(pvc) + isChanged := isStorageClassChanged(oldSc, scName) + + oldSize, ok := pvc.Annotations[annoKeyPVCStatusStorageSize] + if !ok { + quantity := getStorageSize(pvc.Spec.Resources.Requests) + oldSize = quantity.String() + } + if oldSize != size.String() { + isChanged = true + } + if isChanged { + klog.Infof("volume %s/%s is changed, sc (%s => %s), size (%s => %s)", pvc.Namespace, pvc.Name, oldSc, scName, oldSize, size.String()) + } + + return isChanged +} + +func isStorageClassChanged(pre, cur string) bool { + if cur != "" && pre != cur { + return true + } + return false +} + +func isPVCRevisionChanged(pvc *corev1.PersistentVolumeClaim) bool { + specRevision, statusRevision := pvc.Annotations[annoKeyPVCSpecRevision], pvc.Annotations[annoKeyPVCStatusRevision] + return specRevision != statusRevision +} + +func isVolumeExpansionSupported(sc *storagev1.StorageClass) (bool, error) { + if sc == nil { + // always assume expansion is supported + return true, fmt.Errorf("expansion cap of volume is unknown") + } + if sc.AllowVolumeExpansion == nil { + return false, nil + } + return *sc.AllowVolumeExpansion, nil +} + +func isStorageSizeChanged(pvc1, pvc2 *corev1.PersistentVolumeClaim) bool { + expectSize, currentSize := getStorageSize(pvc1.Spec.Resources.Requests), getStorageSize(pvc2.Spec.Resources.Requests) + return expectSize.Cmp(currentSize) != 0 +}