From e341dd55764d9acc72e7157aee185099b30d6e54 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 14 Sep 2022 19:47:18 +0800 Subject: [PATCH 1/7] retry one reset by perr. --- pkg/errorutil/ignore.go | 5 +++++ pkg/errorutil/ignore_test.go | 2 ++ 2 files changed, 7 insertions(+) diff --git a/pkg/errorutil/ignore.go b/pkg/errorutil/ignore.go index 0b5d5fe1d79..07d1968f7dc 100644 --- a/pkg/errorutil/ignore.go +++ b/pkg/errorutil/ignore.go @@ -84,5 +84,10 @@ func IsRetryableEtcdError(err error) bool { if strings.Contains(etcdErr.Error(), "received prior goaway: code: NO_ERROR") { return true } + + // this may happen if the PD instance shutdown by `kill -9`, no matter the instance is the leader or not. + if strings.Contains(etcdErr.Error(), "connection reset by peer") { + return true + } return false } diff --git a/pkg/errorutil/ignore_test.go b/pkg/errorutil/ignore_test.go index 1cbe6ce9773..802a267fd18 100644 --- a/pkg/errorutil/ignore_test.go +++ b/pkg/errorutil/ignore_test.go @@ -63,6 +63,8 @@ func TestIsRetryableEtcdError(t *testing.T) { {errors.New("rpc error: code = Unavailable desc = closing transport due to: " + "connection error: desc = \\\"error reading from server: EOF\\\", " + "received prior goaway: code: NO_ERROR\""), true}, + {errors.New("rpc error: code = Unavailable desc = error reading from server: " + + "xxx: read: connection reset by peer"), true}, } for _, item := range cases { From e80e8571473b00a4cf04f0b612d7b657279464a1 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 15 Sep 2022 14:38:54 +0800 Subject: [PATCH 2/7] enlarge the session ttl to 60s. --- pkg/config/server_config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index a25de95a74b..98914f384e4 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -98,7 +98,7 @@ var defaultServerConfig = &ServerConfig{ // which is calculated by `math.Ceil(3 * election-timeout / 2)`, we choose // default capture session ttl to 10s to increase robust to PD jitter, // however it will decrease RTO when single TiCDC node error happens. - CaptureSessionTTL: 10, + CaptureSessionTTL: 60, OwnerFlushInterval: TomlDuration(200 * time.Millisecond), ProcessorFlushInterval: TomlDuration(100 * time.Millisecond), Sorter: &SorterConfig{ From c71d513f0be43553b052e22d566730ba94e23379 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 15 Sep 2022 14:42:29 +0800 Subject: [PATCH 3/7] ttl to 20 --- pkg/config/server_config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 98914f384e4..2dff52050df 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -98,7 +98,7 @@ var defaultServerConfig = &ServerConfig{ // which is calculated by `math.Ceil(3 * election-timeout / 2)`, we choose // default capture session ttl to 10s to increase robust to PD jitter, // however it will decrease RTO when single TiCDC node error happens. - CaptureSessionTTL: 60, + CaptureSessionTTL: 20, OwnerFlushInterval: TomlDuration(200 * time.Millisecond), ProcessorFlushInterval: TomlDuration(100 * time.Millisecond), Sorter: &SorterConfig{ From 04547f650400b6aadc73f6f344ff284b487ed4ca Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 15 Sep 2022 18:40:16 +0800 Subject: [PATCH 4/7] enlarge to 30 --- pkg/config/server_config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 2dff52050df..29093f02b8b 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -98,7 +98,7 @@ var defaultServerConfig = &ServerConfig{ // which is calculated by `math.Ceil(3 * election-timeout / 2)`, we choose // default capture session ttl to 10s to increase robust to PD jitter, // however it will decrease RTO when single TiCDC node error happens. - CaptureSessionTTL: 20, + CaptureSessionTTL: 30, OwnerFlushInterval: TomlDuration(200 * time.Millisecond), ProcessorFlushInterval: TomlDuration(100 * time.Millisecond), Sorter: &SorterConfig{ From d08e8da486aaaf4b3a9469b8440db00605b78c67 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 16 Sep 2022 16:03:51 +0800 Subject: [PATCH 5/7] fix may owner panic. --- cdc/api/v1/api.go | 9 ++++++++- pkg/config/server_config.go | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index d22d5b35555..43bd8dd2ead 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -874,7 +874,14 @@ func (h *OpenAPI) ServerStatus(c *gin.Context) { func (h *OpenAPI) Health(c *gin.Context) { ctx := c.Request.Context() - health, err := h.statusProvider().IsHealthy(ctx) + provider := h.statusProvider() + if provider == nil { + log.Warn("healthy: status provider is nil") + c.JSON(http.StatusInternalServerError, model.NewHTTPError(cerror.ErrOwnerNotFound.FastGenByArgs())) + return + } + + health, err := provider.IsHealthy(ctx) if err != nil { c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) return diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 29093f02b8b..a25de95a74b 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -98,7 +98,7 @@ var defaultServerConfig = &ServerConfig{ // which is calculated by `math.Ceil(3 * election-timeout / 2)`, we choose // default capture session ttl to 10s to increase robust to PD jitter, // however it will decrease RTO when single TiCDC node error happens. - CaptureSessionTTL: 30, + CaptureSessionTTL: 10, OwnerFlushInterval: TomlDuration(200 * time.Millisecond), ProcessorFlushInterval: TomlDuration(100 * time.Millisecond), Sorter: &SorterConfig{ From be3824b60c4da56e0d07ad9eba020e4462edfa46 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 16 Sep 2022 16:26:42 +0800 Subject: [PATCH 6/7] fix healthy panic. --- cdc/api/v1/api.go | 5 +++-- cdc/api/v1/api_test.go | 20 +++++++++++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index 43bd8dd2ead..1d35d398250 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -874,10 +874,11 @@ func (h *OpenAPI) ServerStatus(c *gin.Context) { func (h *OpenAPI) Health(c *gin.Context) { ctx := c.Request.Context() + var err error provider := h.statusProvider() if provider == nil { - log.Warn("healthy: status provider is nil") - c.JSON(http.StatusInternalServerError, model.NewHTTPError(cerror.ErrOwnerNotFound.FastGenByArgs())) + err = cerror.ErrOwnerNotFound.FastGenByArgs() + c.JSON(http.StatusInternalServerError, model.NewHTTPError(err)) return } diff --git a/cdc/api/v1/api_test.go b/cdc/api/v1/api_test.go index 0740beb9df5..cabb5f8832b 100644 --- a/cdc/api/v1/api_test.go +++ b/cdc/api/v1/api_test.go @@ -896,11 +896,21 @@ func TestHealth(t *testing.T) { // capture is owner ctrl := gomock.NewController(t) cp := mock_capture.NewMockCapture(ctrl) - sp := mock_owner.NewMockStatusProvider(ctrl) - ownerRouter := newRouter(cp, sp) - api := testCase{url: "/api/v1/health", method: "GET"} + ownerRouter := newRouter(cp, nil) cp.EXPECT().IsReady().Return(true).AnyTimes() + cp.EXPECT().StatusProvider().Return(nil).Times(1) + + api := testCase{url: "/api/v1/health", method: "GET"} + + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil) + ownerRouter.ServeHTTP(w, req) + require.Equal(t, 500, w.Code) + + sp := mock_owner.NewMockStatusProvider(ctrl) + ownerRouter = newRouter(cp, sp) + cp.EXPECT().Info().DoAndReturn(func() (model.CaptureInfo, error) { return model.CaptureInfo{}, nil }).AnyTimes() @@ -911,8 +921,8 @@ func TestHealth(t *testing.T) { // IsHealthy returns error. isHealthError := sp.EXPECT().IsHealthy(gomock.Any()). Return(false, cerror.ErrOwnerNotFound) - w := httptest.NewRecorder() - req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil) + w = httptest.NewRecorder() + req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil) ownerRouter.ServeHTTP(w, req) require.Equal(t, 500, w.Code) From b702481ec951ddc86ba06df7d40de4db5ed97581 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 16 Sep 2022 16:40:02 +0800 Subject: [PATCH 7/7] forward healthy to owner. --- cdc/api/v1/api.go | 12 ++++-------- cdc/api/v1/api_test.go | 17 ++++------------- 2 files changed, 8 insertions(+), 21 deletions(-) diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index 1d35d398250..bbca9d5aeae 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -872,17 +872,13 @@ func (h *OpenAPI) ServerStatus(c *gin.Context) { // @Failure 500 {object} model.HTTPError // @Router /api/v1/health [get] func (h *OpenAPI) Health(c *gin.Context) { - ctx := c.Request.Context() - - var err error - provider := h.statusProvider() - if provider == nil { - err = cerror.ErrOwnerNotFound.FastGenByArgs() - c.JSON(http.StatusInternalServerError, model.NewHTTPError(err)) + if !h.capture.IsOwner() { + middleware.ForwardToOwnerMiddleware(h.capture)(c) return } - health, err := provider.IsHealthy(ctx) + ctx := c.Request.Context() + health, err := h.statusProvider().IsHealthy(ctx) if err != nil { c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) return diff --git a/cdc/api/v1/api_test.go b/cdc/api/v1/api_test.go index cabb5f8832b..e92215e30de 100644 --- a/cdc/api/v1/api_test.go +++ b/cdc/api/v1/api_test.go @@ -896,21 +896,12 @@ func TestHealth(t *testing.T) { // capture is owner ctrl := gomock.NewController(t) cp := mock_capture.NewMockCapture(ctrl) - ownerRouter := newRouter(cp, nil) - - cp.EXPECT().IsReady().Return(true).AnyTimes() - cp.EXPECT().StatusProvider().Return(nil).Times(1) api := testCase{url: "/api/v1/health", method: "GET"} - - w := httptest.NewRecorder() - req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil) - ownerRouter.ServeHTTP(w, req) - require.Equal(t, 500, w.Code) - sp := mock_owner.NewMockStatusProvider(ctrl) - ownerRouter = newRouter(cp, sp) + ownerRouter := newRouter(cp, sp) + cp.EXPECT().IsReady().Return(true).AnyTimes() cp.EXPECT().Info().DoAndReturn(func() (model.CaptureInfo, error) { return model.CaptureInfo{}, nil }).AnyTimes() @@ -921,8 +912,8 @@ func TestHealth(t *testing.T) { // IsHealthy returns error. isHealthError := sp.EXPECT().IsHealthy(gomock.Any()). Return(false, cerror.ErrOwnerNotFound) - w = httptest.NewRecorder() - req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil) + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil) ownerRouter.ServeHTTP(w, req) require.Equal(t, 500, w.Code)