Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Merge branch 'master' into cli-use-open-api and resolve conflicts" #6151

Merged
merged 1 commit into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,8 @@ swagger-spec: tools/bin/swag
generate_mock: tools/bin/mockgen
tools/bin/mockgen -source cdc/owner/owner.go -destination cdc/owner/mock/owner_mock.go
tools/bin/mockgen -source cdc/api/v2/api_helpers.go -destination cdc/api/v2/api_helpers_mock.go -package v2
tools/bin/mockgen -source cdc/capture/info_for_api.go -destination cdc/capture/mock/capture_info_mock.go
tools/bin/mockgen -source pkg/etcd/client_for_api.go -destination pkg/etcd/mock/etcd_client_mock.go
tools/bin/mockgen -source cdc/processor/manager.go -destination cdc/processor/mock/manager_mock.go
tools/bin/mockgen -source cdc/capture/capture.go -destination cdc/capture/mock/capture_mock.go

clean:
go clean -i ./...
rm -rf *.out
Expand Down
4 changes: 2 additions & 2 deletions cdc/api/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func ErrorHandleMiddleware() gin.HandlerFunc {

// ForwardToOwnerMiddleware forward an request to owner if current server
// is not owner, or handle it locally.
func ForwardToOwnerMiddleware(p capture.Capture) gin.HandlerFunc {
func ForwardToOwnerMiddleware(p capture.InfoForAPI) gin.HandlerFunc {
return func(ctx *gin.Context) {
if !p.IsOwner() {
api.ForwardToOwner(ctx, p)
Expand All @@ -97,7 +97,7 @@ func ForwardToOwnerMiddleware(p capture.Capture) gin.HandlerFunc {
}

// CheckServerReadyMiddleware checks if the server is ready
func CheckServerReadyMiddleware(capture capture.Capture) gin.HandlerFunc {
func CheckServerReadyMiddleware(capture capture.InfoForAPI) gin.HandlerFunc {
return func(c *gin.Context) {
if capture.IsReady() {
c.Next()
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/middleware/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import (
"github.com/stretchr/testify/require"
)

type testCapture struct {
capture.Capture
type testCaptureInfoProvider struct {
capture.InfoForAPI
ready bool
}

func (c *testCapture) IsReady() bool {
func (c *testCaptureInfoProvider) IsReady() bool {
return c.ready
}

func TestCheckServerReadyMiddleware(t *testing.T) {
capture := &testCapture{ready: false}
capture := &testCaptureInfoProvider{ready: false}
router := gin.New()
router.Use(CheckServerReadyMiddleware(capture))
router.Use(LogMiddleware())
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ func (c ChangefeedResp) MarshalJSON() ([]byte, error) {

// ownerAPI provides owner APIs.
type ownerAPI struct {
capture capture.Capture
capture *capture.Capture
}

// RegisterOwnerAPIRoutes registers routes for owner APIs.
func RegisterOwnerAPIRoutes(router *gin.Engine, capture capture.Capture) {
func RegisterOwnerAPIRoutes(router *gin.Engine, capture *capture.Capture) {
ownerAPI := ownerAPI{capture: capture}
owner := router.Group("/capture/owner")

Expand Down Expand Up @@ -228,13 +228,13 @@ func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Reques
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cfInfo, err := h.capture.GetEtcdClient().GetChangeFeedInfo(ctx, changefeedID)
cfInfo, err := h.capture.EtcdClient.GetChangeFeedInfo(ctx, changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID))
return
}
cfStatus, _, err := h.capture.GetEtcdClient().GetChangeFeedStatus(ctx, changefeedID)
cfStatus, _, err := h.capture.EtcdClient.GetChangeFeedStatus(ctx, changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
api.WriteError(w, http.StatusBadRequest, err)
return
Expand Down
6 changes: 3 additions & 3 deletions cdc/api/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ type status struct {
}

type statusAPI struct {
capture capture.Capture
capture *capture.Capture
}

// RegisterStatusAPIRoutes registers routes for status.
func RegisterStatusAPIRoutes(router *gin.Engine, capture capture.Capture) {
func RegisterStatusAPIRoutes(router *gin.Engine, capture *capture.Capture) {
statusAPI := statusAPI{capture: capture}
router.GET("/status", gin.WrapF(statusAPI.handleStatus))
router.GET("/debug/info", gin.WrapF(statusAPI.handleDebugInfo))
Expand All @@ -65,7 +65,7 @@ func (h *statusAPI) handleDebugInfo(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
h.capture.WriteDebugInfo(ctx, w)
fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n")
h.writeEtcdInfo(ctx, h.capture.GetEtcdClient(), w)
h.writeEtcdInfo(ctx, h.capture.EtcdClient, w)
}

func (h *statusAPI) handleStatus(w http.ResponseWriter, req *http.Request) {
Expand Down
10 changes: 5 additions & 5 deletions cdc/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func WriteData(w http.ResponseWriter, data interface{}) {

// HandleOwnerJob enqueue the admin job
func HandleOwnerJob(
ctx context.Context, capture capture.Capture, job model.AdminJob,
ctx context.Context, capture capture.InfoForAPI, job model.AdminJob,
) error {
// Use buffered channel to prevent blocking owner from happening.
done := make(chan error, 1)
Expand All @@ -113,7 +113,7 @@ func HandleOwnerJob(

// HandleOwnerBalance balance the changefeed tables
func HandleOwnerBalance(
ctx context.Context, capture capture.Capture, changefeedID model.ChangeFeedID,
ctx context.Context, capture capture.InfoForAPI, changefeedID model.ChangeFeedID,
) error {
// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
Expand All @@ -132,7 +132,7 @@ func HandleOwnerBalance(

// HandleOwnerScheduleTable schedule tables
func HandleOwnerScheduleTable(
ctx context.Context, capture capture.Capture,
ctx context.Context, capture capture.InfoForAPI,
changefeedID model.ChangeFeedID, captureID string, tableID int64,
) error {
// Use buffered channel to prevent blocking owner.
Expand All @@ -151,7 +151,7 @@ func HandleOwnerScheduleTable(
}

// ForwardToOwner forwards an request to the owner
func ForwardToOwner(c *gin.Context, p capture.Capture) {
func ForwardToOwner(c *gin.Context, p capture.InfoForAPI) {
ctx := c.Request.Context()
// every request can only forward to owner one time
if len(c.GetHeader(forwardFromCapture)) != 0 {
Expand Down Expand Up @@ -233,7 +233,7 @@ func ForwardToOwner(c *gin.Context, p capture.Capture) {

// HandleOwnerDrainCapture schedule drain the target capture
func HandleOwnerDrainCapture(
ctx context.Context, capture capture.Capture, captureID string,
ctx context.Context, capture capture.InfoForAPI, captureID string,
) (*model.DrainCaptureResp, error) {
// Use buffered channel to prevent blocking owner.
done := make(chan error, 1)
Expand Down
28 changes: 14 additions & 14 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ const (

// OpenAPI provides capture APIs.
type OpenAPI struct {
capture capture.Capture
capture capture.InfoForAPI
// use for unit test only
testStatusProvider owner.StatusProvider
}

// NewOpenAPI creates a new OpenAPI.
func NewOpenAPI(c capture.Capture) OpenAPI {
func NewOpenAPI(c capture.InfoForAPI) OpenAPI {
return OpenAPI{capture: c}
}

// NewOpenAPI4Test return a OpenAPI for test
func NewOpenAPI4Test(c capture.Capture, p owner.StatusProvider) OpenAPI {
func NewOpenAPI4Test(c capture.InfoForAPI, p owner.StatusProvider) OpenAPI {
return OpenAPI{capture: c, testStatusProvider: p}
}

Expand Down Expand Up @@ -283,8 +283,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
CAPath: up.SecurityConfig.CAPath,
CertAllowedCN: up.SecurityConfig.CertAllowedCN,
}
err = h.capture.GetEtcdClient().CreateChangefeedInfo(
ctx, upstreamInfo, info, model.DefaultChangeFeedID(changefeedConfig.ID))
err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx, upstreamInfo,
info,
model.DefaultChangeFeedID(changefeedConfig.ID))
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -720,7 +721,7 @@ func (h *OpenAPI) ListCapture(c *gin.Context) {
// @Produce json
// @Success 200,202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/captures/drain [put]
// @Router /api/v1/captures/drain [post]
func (h *OpenAPI) DrainCapture(c *gin.Context) {
var req model.DrainCaptureRequest
if err := c.ShouldBindJSON(&req); err != nil {
Expand Down Expand Up @@ -791,19 +792,18 @@ func (h *OpenAPI) DrainCapture(c *gin.Context) {
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/status [get]
func (h *OpenAPI) ServerStatus(c *gin.Context) {
status := model.ServerStatus{
Version: version.ReleaseVersion,
GitHash: version.GitHash,
Pid: os.Getpid(),
}
info, err := h.capture.Info()
if err != nil {
_ = c.Error(err)
return
}
status := model.ServerStatus{
Version: version.ReleaseVersion,
GitHash: version.GitHash,
Pid: os.Getpid(),
ID: info.ID,
IsOwner: h.capture.IsOwner(),
Liveness: h.capture.Liveness(),
}
status.ID = info.ID
status.IsOwner = h.capture.IsOwner()
c.IndentedJSON(http.StatusOK, status)
}

Expand Down
43 changes: 1 addition & 42 deletions cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/golang/mock/gomock"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/capture"
mock_capture "github.com/pingcap/tiflow/cdc/capture/mock"
"github.com/pingcap/tiflow/cdc/model"
mock_owner "github.com/pingcap/tiflow/cdc/owner/mock"
"github.com/pingcap/tiflow/cdc/scheduler"
Expand Down Expand Up @@ -92,7 +91,7 @@ func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureI
return args.Get(0).([]*model.CaptureInfo), args.Error(1)
}

func newRouter(c capture.Capture, p *mockStatusProvider) *gin.Engine {
func newRouter(c *capture.Capture, p *mockStatusProvider) *gin.Engine {
router := gin.New()
RegisterOpenAPIRoutes(router, NewOpenAPI4Test(c, p))
return router
Expand Down Expand Up @@ -746,46 +745,6 @@ func TestServerStatus(t *testing.T) {
require.False(t, resp.IsOwner)
}

func TestServerStatusLiveness(t *testing.T) {
t.Parallel()
// capture is owner
ctrl := gomock.NewController(t)
cp := mock_capture.NewMockCapture(ctrl)
ownerRouter := newRouter(cp, newStatusProvider())
api := testCase{url: "/api/v1/status", method: "GET"}

cp.EXPECT().Info().DoAndReturn(func() (model.CaptureInfo, error) {
return model.CaptureInfo{}, nil
}).AnyTimes()
cp.EXPECT().IsOwner().Return(true).AnyTimes()
cp.EXPECT().IsReady().Return(true).AnyTimes()

// Alive.
alive := cp.EXPECT().Liveness().DoAndReturn(func() model.Liveness {
return model.LivenessCaptureAlive
})
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
ownerRouter.ServeHTTP(w, req)
require.Equal(t, 200, w.Code)
var resp model.ServerStatus
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.EqualValues(t, model.LivenessCaptureAlive, resp.Liveness)

// Draining the capture.
cp.EXPECT().Liveness().DoAndReturn(func() model.Liveness {
return model.LivenessCaptureStopping
}).After(alive)
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
ownerRouter.ServeHTTP(w, req)
require.Equal(t, 200, w.Code)
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.EqualValues(t, model.LivenessCaptureStopping, resp.Liveness)
}

func TestSetLogLevel(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
func VerifyCreateChangefeedConfig(
ctx context.Context,
changefeedConfig model.ChangefeedConfig,
capture capture.Capture,
capture capture.InfoForAPI,
) (*model.ChangeFeedInfo, error) {
// TODO(dongmen): we should pass ClusterID in ChangefeedConfig in the upcoming future
up := capture.GetUpstreamManager().GetDefaultUpstream()
Expand Down
6 changes: 3 additions & 3 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ import (

// OpenAPIV2 provides CDC v2 APIs
type OpenAPIV2 struct {
capture capture.Capture
capture capture.InfoForAPI
helpers APIV2Helpers
}

// NewOpenAPIV2 creates a new OpenAPIV2.
func NewOpenAPIV2(c capture.Capture) OpenAPIV2 {
func NewOpenAPIV2(c *capture.Capture) OpenAPIV2 {
return OpenAPIV2{c, APIV2HelpersImpl{}}
}

// NewOpenAPIV2ForTest creates a new OpenAPIV2.
func NewOpenAPIV2ForTest(c capture.Capture, h APIV2Helpers) OpenAPIV2 {
func NewOpenAPIV2ForTest(c capture.InfoForAPI, h APIV2Helpers) OpenAPIV2 {
return OpenAPIV2{c, h}
}

Expand Down
Loading