Skip to content

Commit

Permalink
ticdc: support graceful upgrade TiCDC pods (pingcap#4647)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus authored and xhebox committed Sep 16, 2022
1 parent 9dc24f9 commit 2df0ecd
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 22 deletions.
16 changes: 16 additions & 0 deletions docs/api-references/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -14521,6 +14521,22 @@ string
Defaults to Kubernetes default storage class.</p>
</td>
</tr>
<tr>
<td>
<code>gracefulShutdownTimeout</code></br>
<em>
<a href="https://godoc.org/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>GracefulShutdownTimeout is the timeout of gracefully shutdown a TiCDC pod.
Encoded in the format of Go Duration.
Defaults to 10m</p>
</td>
</tr>
</tbody>
</table>
<h3 id="ticdcstatus">TiCDCStatus</h3>
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21443,6 +21443,8 @@ spec:
type: object
type: object
type: array
gracefulShutdownTimeout:
type: string
hostNetwork:
type: boolean
image:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd/v1/pingcap.com_tidbclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9134,6 +9134,8 @@ spec:
type: object
type: object
type: array
gracefulShutdownTimeout:
type: string
hostNetwork:
type: boolean
image:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd/v1beta1/pingcap.com_tidbclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9122,6 +9122,8 @@ spec:
type: object
type: object
type: array
gracefulShutdownTimeout:
type: string
hostNetwork:
type: boolean
image:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd_v1beta1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21429,6 +21429,8 @@ spec:
type: object
type: object
type: array
gracefulShutdownTimeout:
type: string
hostNetwork:
type: boolean
image:
Expand Down
8 changes: 7 additions & 1 deletion pkg/apis/pingcap/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 17 additions & 5 deletions pkg/apis/pingcap/v1alpha1/tidbcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
defaultEnablePVReclaim = false
// defaultEvictLeaderTimeout is the timeout limit of evict leader
defaultEvictLeaderTimeout = 1500 * time.Minute
// defaultTiCDCGracefulShutdownTimeout is the timeout limit of graceful
// shutdown a TiCDC pod.
defaultTiCDCGracefulShutdownTimeout = 10 * time.Minute
)

var (
Expand Down Expand Up @@ -194,6 +197,14 @@ func (tc *TidbCluster) TiFlashVersion() string {
return "latest"
}

func (tc *TidbCluster) TiFlashContainerPrivilege() *bool {
if tc.Spec.TiFlash == nil || tc.Spec.TiFlash.Privileged == nil {
pri := false
return &pri
}
return tc.Spec.TiFlash.Privileged
}

// TiCDCImage return the image used by TiCDC.
//
// If TiCDC isn't specified, return empty string.
Expand All @@ -219,12 +230,13 @@ func (tc *TidbCluster) TiCDCImage() string {
return image
}

func (tc *TidbCluster) TiFlashContainerPrivilege() *bool {
if tc.Spec.TiFlash == nil || tc.Spec.TiFlash.Privileged == nil {
pri := false
return &pri
// TiCDCGracefulShutdownTimeout returns the timeout of gracefully shutdown
// a TiCDC pod.
func (tc *TidbCluster) TiCDCGracefulShutdownTimeout() time.Duration {
if tc.Spec.TiCDC != nil && tc.Spec.TiCDC.GracefulShutdownTimeout != nil {
return tc.Spec.TiCDC.GracefulShutdownTimeout.Duration
}
return tc.Spec.TiFlash.Privileged
return defaultTiCDCGracefulShutdownTimeout
}

// TiDBImage return the image used by TiDB.
Expand Down
14 changes: 14 additions & 0 deletions pkg/apis/pingcap/v1alpha1/tidbcluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package v1alpha1

import (
"testing"
"time"

. "github.com/onsi/gomega"
apps "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -672,6 +673,19 @@ func TestPDVersion(t *testing.T) {
}
}

func TestTiCDCGracefulShutdownTimeout(t *testing.T) {
g := NewGomegaWithT(t)

tc := newTidbCluster()
g.Expect(tc.TiCDCGracefulShutdownTimeout()).To(Equal(defaultTiCDCGracefulShutdownTimeout))

tc.Spec.TiCDC = &TiCDCSpec{GracefulShutdownTimeout: nil}
g.Expect(tc.TiCDCGracefulShutdownTimeout()).To(Equal(defaultTiCDCGracefulShutdownTimeout))

tc.Spec.TiCDC = &TiCDCSpec{GracefulShutdownTimeout: &metav1.Duration{Duration: time.Minute}}
g.Expect(tc.TiCDCGracefulShutdownTimeout()).To(Equal(time.Minute))
}

func TestComponentFunc(t *testing.T) {
t.Run("ComponentIsNormal", func(t *testing.T) {
g := NewGomegaWithT(t)
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/pingcap/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,12 @@ type TiCDCSpec struct {
// Defaults to Kubernetes default storage class.
// +optional
StorageClassName *string `json:"storageClassName,omitempty"`

// GracefulShutdownTimeout is the timeout of gracefully shutdown a TiCDC pod.
// Encoded in the format of Go Duration.
// Defaults to 10m
// +optional
GracefulShutdownTimeout *metav1.Duration `json:"gracefulShutdownTimeout,omitempty"`
}

// TiCDCConfig is the configuration of tidbcdc
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/manager/member/ticdc_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,6 @@ func gracefulShutdownTiCDC(
return nil
}

// TODO: support configurable graceful shutdown timeout.
var ticdcGracefulShutdownTimeout time.Duration = 10 * time.Second

func checkTiCDCGracefulShutdownTimeout(
tc *v1alpha1.TidbCluster,
podCtl controller.PodControlInterface,
Expand All @@ -186,7 +183,7 @@ func checkTiCDCGracefulShutdownTimeout(
return true, nil
}

gracefulShutdownTimeout := ticdcGracefulShutdownTimeout
gracefulShutdownTimeout := tc.TiCDCGracefulShutdownTimeout()
if time.Now().After(beginTime.Add(gracefulShutdownTimeout)) {
klog.Infof("ticdc.%s: graceful shutdown timeout (threshold: %v) for Pod %s in cluster %s/%s",
action, gracefulShutdownTimeout, podName, ns, tc.GetName())
Expand Down
1 change: 1 addition & 0 deletions pkg/manager/member/ticdc_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func TestTiCDCGracefulShutdown(t *testing.T) {

tc := newTidbClusterForPD()
tc.Spec.TiCDC = &v1alpha1.TiCDCSpec{}
ticdcGracefulShutdownTimeout := tc.TiCDCGracefulShutdownTimeout()
newPod := func() *corev1.Pod {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"},
Expand Down
15 changes: 11 additions & 4 deletions pkg/manager/member/ticdc_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func (u *ticdcUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulS

mngerutils.SetUpgradePartition(newSet, *oldSet.Spec.UpdateStrategy.RollingUpdate.Partition)
podOrdinals := helper.GetPodOrdinals(*oldSet.Spec.Replicas, oldSet).List()
for _i := len(podOrdinals) - 1; _i >= 0; _i-- {
i := podOrdinals[_i]
podName := ticdcPodName(tcName, i)
for i := len(podOrdinals) - 1; i >= 0; i-- {
ordinal := podOrdinals[i]
podName := ticdcPodName(tcName, ordinal)
pod, err := u.deps.PodLister.Pods(ns).Get(podName)
if err != nil {
return fmt.Errorf("ticdcUpgrader.Upgrade: failed to get pod %s for cluster %s/%s, error: %s", podName, ns, tcName, err)
Expand All @@ -108,7 +108,14 @@ func (u *ticdcUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulS
}
continue
}
mngerutils.SetUpgradePartition(newSet, i)

err = gracefulShutdownTiCDC(tc, u.deps.CDCControl, u.deps.PodControl, pod, ordinal, "Upgrade")
if err != nil {
return err
}
klog.Infof("ticdcUpgrade.Upgrade: %s has graceful shutdown in cluster %s/%s", podName, tc.GetNamespace(), tc.GetName())

mngerutils.SetUpgradePartition(newSet, ordinal)
return nil
}

Expand Down
36 changes: 28 additions & 8 deletions pkg/manager/member/ticdc_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ func TestTiCDCUpgrader_Upgrade(t *testing.T) {
g := NewGomegaWithT(t)

type testcase struct {
name string
changeFn func(*v1alpha1.TidbCluster)
invalidPod bool
changePods func(pods []*corev1.Pod)
missPod bool
errorExpect bool
changeOldSet func(set *apps.StatefulSet)
expectFn func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet)
name string
changeFn func(*v1alpha1.TidbCluster)
invalidPod bool
changePods func(pods []*corev1.Pod)
missPod bool
errorExpect bool
changeOldSet func(set *apps.StatefulSet)
changeUpgrader func(u *ticdcUpgrader)
expectFn func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet)
}

testFn := func(test *testcase, t *testing.T) {
Expand All @@ -61,6 +62,9 @@ func TestTiCDCUpgrader_Upgrade(t *testing.T) {
if test.changePods != nil {
test.changePods(pods)
}
if test.changeUpgrader != nil {
test.changeUpgrader(upgrader.(*ticdcUpgrader))
}
for _, pod := range pods {
podInformer.Informer().GetIndexer().Add(pod)
}
Expand Down Expand Up @@ -89,6 +93,22 @@ func TestTiCDCUpgrader_Upgrade(t *testing.T) {
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(0)))
},
},
{
name: "graceful upgrade retry",
errorExpect: true,
changeUpgrader: func(u *ticdcUpgrader) {
u.deps.CDCControl = &cdcCtlMock{
// resignOwner returns false to let graceful shutdown retry.
resignOwner: func(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) {
return false, nil
},
}
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.TiCDC.Phase).To(Equal(v1alpha1.UpgradePhase))
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
},
},
{
name: "normal with pod notReady",
changePods: func(pods []*corev1.Pod) {
Expand Down

0 comments on commit 2df0ecd

Please sign in to comment.