Skip to content

Commit

Permalink
Revert "Merge branch 'master' into cli-use-open-api and resolve confl…
Browse files Browse the repository at this point in the history
…icts (#6137)" (#6151)

This reverts commit 1f8d40c.
  • Loading branch information
asddongmen authored Jul 1, 2022
1 parent 1f8d40c commit ef587e6
Show file tree
Hide file tree
Showing 107 changed files with 1,511 additions and 2,716 deletions.
4 changes: 1 addition & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,8 @@ swagger-spec: tools/bin/swag
generate_mock: tools/bin/mockgen
tools/bin/mockgen -source cdc/owner/owner.go -destination cdc/owner/mock/owner_mock.go
tools/bin/mockgen -source cdc/api/v2/api_helpers.go -destination cdc/api/v2/api_helpers_mock.go -package v2
tools/bin/mockgen -source cdc/capture/info_for_api.go -destination cdc/capture/mock/capture_info_mock.go
tools/bin/mockgen -source pkg/etcd/client_for_api.go -destination pkg/etcd/mock/etcd_client_mock.go
tools/bin/mockgen -source cdc/processor/manager.go -destination cdc/processor/mock/manager_mock.go
tools/bin/mockgen -source cdc/capture/capture.go -destination cdc/capture/mock/capture_mock.go

clean:
go clean -i ./...
rm -rf *.out
Expand Down
4 changes: 2 additions & 2 deletions cdc/api/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func ErrorHandleMiddleware() gin.HandlerFunc {

// ForwardToOwnerMiddleware forward an request to owner if current server
// is not owner, or handle it locally.
func ForwardToOwnerMiddleware(p capture.Capture) gin.HandlerFunc {
func ForwardToOwnerMiddleware(p capture.InfoForAPI) gin.HandlerFunc {
return func(ctx *gin.Context) {
if !p.IsOwner() {
api.ForwardToOwner(ctx, p)
Expand All @@ -97,7 +97,7 @@ func ForwardToOwnerMiddleware(p capture.Capture) gin.HandlerFunc {
}

// CheckServerReadyMiddleware checks if the server is ready
func CheckServerReadyMiddleware(capture capture.Capture) gin.HandlerFunc {
func CheckServerReadyMiddleware(capture capture.InfoForAPI) gin.HandlerFunc {
return func(c *gin.Context) {
if capture.IsReady() {
c.Next()
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/middleware/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import (
"github.com/stretchr/testify/require"
)

type testCapture struct {
capture.Capture
type testCaptureInfoProvider struct {
capture.InfoForAPI
ready bool
}

func (c *testCapture) IsReady() bool {
func (c *testCaptureInfoProvider) IsReady() bool {
return c.ready
}

func TestCheckServerReadyMiddleware(t *testing.T) {
capture := &testCapture{ready: false}
capture := &testCaptureInfoProvider{ready: false}
router := gin.New()
router.Use(CheckServerReadyMiddleware(capture))
router.Use(LogMiddleware())
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ func (c ChangefeedResp) MarshalJSON() ([]byte, error) {

// ownerAPI provides owner APIs.
type ownerAPI struct {
capture capture.Capture
capture *capture.Capture
}

// RegisterOwnerAPIRoutes registers routes for owner APIs.
func RegisterOwnerAPIRoutes(router *gin.Engine, capture capture.Capture) {
func RegisterOwnerAPIRoutes(router *gin.Engine, capture *capture.Capture) {
ownerAPI := ownerAPI{capture: capture}
owner := router.Group("/capture/owner")

Expand Down Expand Up @@ -228,13 +228,13 @@ func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Reques
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cfInfo, err := h.capture.GetEtcdClient().GetChangeFeedInfo(ctx, changefeedID)
cfInfo, err := h.capture.EtcdClient.GetChangeFeedInfo(ctx, changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID))
return
}
cfStatus, _, err := h.capture.GetEtcdClient().GetChangeFeedStatus(ctx, changefeedID)
cfStatus, _, err := h.capture.EtcdClient.GetChangeFeedStatus(ctx, changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
api.WriteError(w, http.StatusBadRequest, err)
return
Expand Down
6 changes: 3 additions & 3 deletions cdc/api/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ type status struct {
}

type statusAPI struct {
capture capture.Capture
capture *capture.Capture
}

// RegisterStatusAPIRoutes registers routes for status.
func RegisterStatusAPIRoutes(router *gin.Engine, capture capture.Capture) {
func RegisterStatusAPIRoutes(router *gin.Engine, capture *capture.Capture) {
statusAPI := statusAPI{capture: capture}
router.GET("/status", gin.WrapF(statusAPI.handleStatus))
router.GET("/debug/info", gin.WrapF(statusAPI.handleDebugInfo))
Expand All @@ -65,7 +65,7 @@ func (h *statusAPI) handleDebugInfo(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
h.capture.WriteDebugInfo(ctx, w)
fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n")
h.writeEtcdInfo(ctx, h.capture.GetEtcdClient(), w)
h.writeEtcdInfo(ctx, h.capture.EtcdClient, w)
}

func (h *statusAPI) handleStatus(w http.ResponseWriter, req *http.Request) {
Expand Down
10 changes: 5 additions & 5 deletions cdc/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func WriteData(w http.ResponseWriter, data interface{}) {

// HandleOwnerJob enqueue the admin job
func HandleOwnerJob(
ctx context.Context, capture capture.Capture, job model.AdminJob,
ctx context.Context, capture capture.InfoForAPI, job model.AdminJob,
) error {
// Use buffered channel to prevent blocking owner from happening.
done := make(chan error, 1)
Expand All @@ -113,7 +113,7 @@ func HandleOwnerJob(

// HandleOwnerBalance balance the changefeed tables
func HandleOwnerBalance(
ctx context.Context, capture capture.Capture, changefeedID model.ChangeFeedID,
ctx context.Context, capture capture.InfoForAPI, changefeedID model.ChangeFeedID,
) error {
// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
Expand All @@ -132,7 +132,7 @@ func HandleOwnerBalance(

// HandleOwnerScheduleTable schedule tables
func HandleOwnerScheduleTable(
ctx context.Context, capture capture.Capture,
ctx context.Context, capture capture.InfoForAPI,
changefeedID model.ChangeFeedID, captureID string, tableID int64,
) error {
// Use buffered channel to prevent blocking owner.
Expand All @@ -151,7 +151,7 @@ func HandleOwnerScheduleTable(
}

// ForwardToOwner forwards an request to the owner
func ForwardToOwner(c *gin.Context, p capture.Capture) {
func ForwardToOwner(c *gin.Context, p capture.InfoForAPI) {
ctx := c.Request.Context()
// every request can only forward to owner one time
if len(c.GetHeader(forwardFromCapture)) != 0 {
Expand Down Expand Up @@ -233,7 +233,7 @@ func ForwardToOwner(c *gin.Context, p capture.Capture) {

// HandleOwnerDrainCapture schedule drain the target capture
func HandleOwnerDrainCapture(
ctx context.Context, capture capture.Capture, captureID string,
ctx context.Context, capture capture.InfoForAPI, captureID string,
) (*model.DrainCaptureResp, error) {
// Use buffered channel to prevent blocking owner.
done := make(chan error, 1)
Expand Down
28 changes: 14 additions & 14 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ const (

// OpenAPI provides capture APIs.
type OpenAPI struct {
capture capture.Capture
capture capture.InfoForAPI
// use for unit test only
testStatusProvider owner.StatusProvider
}

// NewOpenAPI creates a new OpenAPI.
func NewOpenAPI(c capture.Capture) OpenAPI {
func NewOpenAPI(c capture.InfoForAPI) OpenAPI {
return OpenAPI{capture: c}
}

// NewOpenAPI4Test return a OpenAPI for test
func NewOpenAPI4Test(c capture.Capture, p owner.StatusProvider) OpenAPI {
func NewOpenAPI4Test(c capture.InfoForAPI, p owner.StatusProvider) OpenAPI {
return OpenAPI{capture: c, testStatusProvider: p}
}

Expand Down Expand Up @@ -283,8 +283,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
CAPath: up.SecurityConfig.CAPath,
CertAllowedCN: up.SecurityConfig.CertAllowedCN,
}
err = h.capture.GetEtcdClient().CreateChangefeedInfo(
ctx, upstreamInfo, info, model.DefaultChangeFeedID(changefeedConfig.ID))
err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx, upstreamInfo,
info,
model.DefaultChangeFeedID(changefeedConfig.ID))
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -720,7 +721,7 @@ func (h *OpenAPI) ListCapture(c *gin.Context) {
// @Produce json
// @Success 200,202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/captures/drain [put]
// @Router /api/v1/captures/drain [post]
func (h *OpenAPI) DrainCapture(c *gin.Context) {
var req model.DrainCaptureRequest
if err := c.ShouldBindJSON(&req); err != nil {
Expand Down Expand Up @@ -791,19 +792,18 @@ func (h *OpenAPI) DrainCapture(c *gin.Context) {
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/status [get]
func (h *OpenAPI) ServerStatus(c *gin.Context) {
status := model.ServerStatus{
Version: version.ReleaseVersion,
GitHash: version.GitHash,
Pid: os.Getpid(),
}
info, err := h.capture.Info()
if err != nil {
_ = c.Error(err)
return
}
status := model.ServerStatus{
Version: version.ReleaseVersion,
GitHash: version.GitHash,
Pid: os.Getpid(),
ID: info.ID,
IsOwner: h.capture.IsOwner(),
Liveness: h.capture.Liveness(),
}
status.ID = info.ID
status.IsOwner = h.capture.IsOwner()
c.IndentedJSON(http.StatusOK, status)
}

Expand Down
43 changes: 1 addition & 42 deletions cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/golang/mock/gomock"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/capture"
mock_capture "github.com/pingcap/tiflow/cdc/capture/mock"
"github.com/pingcap/tiflow/cdc/model"
mock_owner "github.com/pingcap/tiflow/cdc/owner/mock"
"github.com/pingcap/tiflow/cdc/scheduler"
Expand Down Expand Up @@ -92,7 +91,7 @@ func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureI
return args.Get(0).([]*model.CaptureInfo), args.Error(1)
}

func newRouter(c capture.Capture, p *mockStatusProvider) *gin.Engine {
func newRouter(c *capture.Capture, p *mockStatusProvider) *gin.Engine {
router := gin.New()
RegisterOpenAPIRoutes(router, NewOpenAPI4Test(c, p))
return router
Expand Down Expand Up @@ -746,46 +745,6 @@ func TestServerStatus(t *testing.T) {
require.False(t, resp.IsOwner)
}

func TestServerStatusLiveness(t *testing.T) {
t.Parallel()
// capture is owner
ctrl := gomock.NewController(t)
cp := mock_capture.NewMockCapture(ctrl)
ownerRouter := newRouter(cp, newStatusProvider())
api := testCase{url: "/api/v1/status", method: "GET"}

cp.EXPECT().Info().DoAndReturn(func() (model.CaptureInfo, error) {
return model.CaptureInfo{}, nil
}).AnyTimes()
cp.EXPECT().IsOwner().Return(true).AnyTimes()
cp.EXPECT().IsReady().Return(true).AnyTimes()

// Alive.
alive := cp.EXPECT().Liveness().DoAndReturn(func() model.Liveness {
return model.LivenessCaptureAlive
})
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
ownerRouter.ServeHTTP(w, req)
require.Equal(t, 200, w.Code)
var resp model.ServerStatus
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.EqualValues(t, model.LivenessCaptureAlive, resp.Liveness)

// Draining the capture.
cp.EXPECT().Liveness().DoAndReturn(func() model.Liveness {
return model.LivenessCaptureStopping
}).After(alive)
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
ownerRouter.ServeHTTP(w, req)
require.Equal(t, 200, w.Code)
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.EqualValues(t, model.LivenessCaptureStopping, resp.Liveness)
}

func TestSetLogLevel(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
func VerifyCreateChangefeedConfig(
ctx context.Context,
changefeedConfig model.ChangefeedConfig,
capture capture.Capture,
capture capture.InfoForAPI,
) (*model.ChangeFeedInfo, error) {
// TODO(dongmen): we should pass ClusterID in ChangefeedConfig in the upcoming future
up := capture.GetUpstreamManager().GetDefaultUpstream()
Expand Down
6 changes: 3 additions & 3 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ import (

// OpenAPIV2 provides CDC v2 APIs
type OpenAPIV2 struct {
capture capture.Capture
capture capture.InfoForAPI
helpers APIV2Helpers
}

// NewOpenAPIV2 creates a new OpenAPIV2.
func NewOpenAPIV2(c capture.Capture) OpenAPIV2 {
func NewOpenAPIV2(c *capture.Capture) OpenAPIV2 {
return OpenAPIV2{c, APIV2HelpersImpl{}}
}

// NewOpenAPIV2ForTest creates a new OpenAPIV2.
func NewOpenAPIV2ForTest(c capture.Capture, h APIV2Helpers) OpenAPIV2 {
func NewOpenAPIV2ForTest(c capture.InfoForAPI, h APIV2Helpers) OpenAPIV2 {
return OpenAPIV2{c, h}
}

Expand Down
Loading

0 comments on commit ef587e6

Please sign in to comment.