Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

openapi: add source releated API #2055

Merged
merged 20 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
log.L().Warn("using an absolute relay path, relay log can't work when starting multiple relay worker")
}

return c.AdjustCaseSensitive(ctx2, db)
}

// AdjustCaseSensitive adjust CaseSensitive from DB.
func (c *SourceConfig) AdjustCaseSensitive(ctx context.Context, db *sql.DB) (err error) {
caseSensitive, err2 := utils.GetDBCaseSensitive(ctx, db)
if err2 != nil {
return err2
}
c.CaseSensitive = caseSensitive
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down
23 changes: 23 additions & 0 deletions dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/go-mysql-org/go-mysql/mysql"
. "github.com/pingcap/check"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"

"github.com/pingcap/dm/pkg/utils"
)

// do not forget to update this path if the file removed/renamed.
Expand Down Expand Up @@ -312,3 +314,24 @@ func getMockServerIDs(ctx context.Context, db *sql.DB) (map[uint32]struct{}, err
2: {},
}, nil
}

func (t *testConfig) TestAdjustCaseSensitive(c *C) {
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)

db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
mock.ExpectQuery("SELECT @@lower_case_table_names;").
WillReturnRows(sqlmock.NewRows([]string{"@@lower_case_table_names"}).AddRow(utils.LCTableNamesMixed))
mock.ExpectClose()
c.Assert(cfg.AdjustCaseSensitive(context.Background(), db), IsNil)
c.Assert(cfg.CaseSensitive, Equals, false)

db, mock, err = sqlmock.New()
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, IsNil)
mock.ExpectQuery("SELECT @@lower_case_table_names;").
WillReturnRows(sqlmock.NewRows([]string{"@@lower_case_table_names"}).AddRow(utils.LCTableNamesSensitive))
mock.ExpectClose()
c.Assert(cfg.AdjustCaseSensitive(context.Background(), db), IsNil)
c.Assert(cfg.CaseSensitive, Equals, true)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
cfg1.From.User = user
cfg1.From.Password = password
cfg1.RelayDir = "relay-dir"
c.Assert(checkAndAdjustSourceConfig(ctx, cfg1), IsNil) // adjust source config.
cfg2 := cfg1.Clone()
cfg2.SourceID = "mysql-replica-02"

Expand Down
88 changes: 85 additions & 3 deletions dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package master

import (
"context"
"fmt"
"net/http"

"github.com/deepmap/oapi-codegen/pkg/middleware"
"github.com/labstack/echo/v4"
echomiddleware "github.com/labstack/echo/v4/middleware"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/openapi"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -88,17 +90,64 @@ func (s *Server) GetDocHTML(ctx echo.Context) error {

// DMAPICreateSource url is:(POST /api/v1/sources).
func (s *Server) DMAPICreateSource(ctx echo.Context) error {
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
return nil
needRedirect, host, err := s.redirectRequestToLeader(ctx.Request().Context())
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
if needRedirect {
return ctx.Redirect(http.StatusTemporaryRedirect, fmt.Sprintf("http://%s%s", host, ctx.Request().RequestURI))
}

var createSourceReq openapi.Source
if err := ctx.Bind(&createSourceReq); err != nil {
return err
}
cfg := modelToSourceCfg(createSourceReq)
if err := checkAndAdjustSourceConfig(ctx.Request().Context(), cfg); err != nil {
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
return err
}
if err := s.scheduler.AddSourceCfg(cfg); err != nil {
return err
}
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
return ctx.JSON(http.StatusCreated, createSourceReq)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
}

// DMAPIGetSourceList url is:(GET /api/v1/sources).
func (s *Server) DMAPIGetSourceList(ctx echo.Context) error {
return nil
needRedirect, host, err := s.redirectRequestToLeader(ctx.Request().Context())
if err != nil {
return err
}
if needRedirect {
return ctx.Redirect(http.StatusTemporaryRedirect, fmt.Sprintf("http://%s%s", host, ctx.Request().RequestURI))
}

sourceIDS := s.scheduler.GetSourceCfgIDs()
sourceCfgList := make([]*config.SourceConfig, len(sourceIDS))
for idx, sourceID := range sourceIDS {
sourceCfgList[idx] = s.scheduler.GetSourceCfgByID(sourceID)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
sourceList := make([]openapi.Source, len(sourceCfgList))
for idx, cfg := range sourceCfgList {
sourceList[idx] = sourceCfgToModel(*cfg)
}
resp := openapi.GetSourceListResponse{Total: len(sourceList), Data: sourceList}
return ctx.JSON(http.StatusOK, resp)
}

// DMAPIDeleteSource url is:(DELETE /api/v1/sources).
func (s *Server) DMAPIDeleteSource(ctx echo.Context, sourceName string) error {
return nil
needRedirect, host, err := s.redirectRequestToLeader(ctx.Request().Context())
if err != nil {
return err
}
if needRedirect {
return ctx.Redirect(http.StatusTemporaryRedirect, fmt.Sprintf("http://%s%s", host, ctx.Request().RequestURI))
}
if err := s.scheduler.RemoveSourceCfg(sourceName); err != nil {
return err
}
return ctx.NoContent(http.StatusNoContent)
}

// DMAPIStartRelay url is:(POST /api/v1/sources/{source-id}/relay).
Expand Down Expand Up @@ -154,3 +203,36 @@ func sendHTTPErrorResp(ctx echo.Context, code int, message string) error {
err := openapi.ErrorWithMessage{ErrorMsg: message, ErrorCode: code}
return ctx.JSON(http.StatusBadRequest, err)
}

func sourceCfgToModel(cfg config.SourceConfig) openapi.Source {
// NOTE we don't return SSL cert here, because we don't want to expose it to the user.
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
return openapi.Source{
EnableGtid: cfg.EnableGTID,
Host: cfg.From.Host,
Password: cfg.From.Password,
Port: cfg.From.Port,
SourceName: cfg.SourceID,
User: cfg.From.User,
}
}

func modelToSourceCfg(source openapi.Source) *config.SourceConfig {
cfg := config.NewSourceConfig()
from := config.DBConfig{
Host: source.Host,
Port: source.Port,
User: source.User,
Password: source.Password,
}
if source.Security != nil {
from.Security = &config.Security{
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
SSLCABytes: []byte(source.Security.SslCaContent),
SSLKEYBytes: []byte(source.Security.SslKeyContent),
SSLCertBytes: []byte(source.Security.SslCertContent),
}
}
cfg.From = from
cfg.EnableGTID = source.EnableGtid
cfg.SourceID = source.SourceName
return cfg
}
122 changes: 122 additions & 0 deletions dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,66 @@ package master
import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/deepmap/oapi-codegen/pkg/testutil"
"github.com/pingcap/check"
"github.com/tikv/pd/pkg/tempurl"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/master/workerrpc"
"github.com/pingcap/dm/openapi"
"github.com/pingcap/dm/pkg/ha"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

var openAPITestSuite = check.Suite(&openAPISuite{})

// some data for test.
var (
source1Name = "mysql-replica-01"
)

type openAPISuite struct {
testT *testing.T

etcdTestCli *clientv3.Client
testEtcdCluster *integration.ClusterV3
workerClients map[string]workerrpc.Client
}

func (t *openAPISuite) SetUpTest(c *check.C) {
t.testEtcdCluster = integration.NewClusterV3(t.testT, &integration.ClusterConfig{Size: 1})
t.etcdTestCli = t.testEtcdCluster.RandClient()
t.workerClients = make(map[string]workerrpc.Client)

c.Assert(ha.ClearTestInfoOperation(t.etcdTestCli), check.IsNil)
}

func setupServer(ctx context.Context, c *check.C) *Server {
// create a new cluster
cfg1 := NewConfig()
c.Assert(cfg1.Parse([]string{"-config=./dm-master.toml"}), check.IsNil)
cfg1.Name = "dm-master-1"
cfg1.DataDir = c.MkDir()
cfg1.MasterAddr = tempurl.Alloc()[len("http://"):]
cfg1.PeerUrls = tempurl.Alloc()
cfg1.AdvertisePeerUrls = cfg1.PeerUrls
cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls)

s1 := NewServer(cfg1)
c.Assert(s1.Start(ctx), check.IsNil)
// wait the first one become the leader
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s1.election.IsLeader()
}), check.IsTrue)

return s1
}

func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
Expand Down Expand Up @@ -80,3 +127,78 @@ func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
c.Assert(needRedirect2, check.Equals, true)
c.Assert(openAPIAddrFromS2, check.Equals, s1.cfg.AdvertiseAddr)
}

func (t *openAPISuite) TestSourceAPI(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s := setupServer(ctx, c)
defer s.Close()

baseURL := "/api/v1/sources"

dbCFG := config.GetDBConfigFromEnv()
source1 := openapi.Source{
SourceName: source1Name,
EnableGtid: false,
Host: dbCFG.Host,
Password: dbCFG.Password,
Port: dbCFG.Port,
User: dbCFG.User,
}
result := testutil.NewRequest().Post(baseURL).WithJsonBody(source1).Go(t.testT, s.echo)
// check http status code
c.Assert(result.Code(), check.Equals, http.StatusCreated)
var resultSource openapi.Source
err := result.UnmarshalBodyToObject(&resultSource)
c.Assert(err, check.IsNil)
c.Assert(resultSource.User, check.Equals, source1.User)
c.Assert(resultSource.Host, check.Equals, source1.Host)
c.Assert(resultSource.Port, check.Equals, source1.Port)
c.Assert(resultSource.Password, check.Equals, source1.Password)
c.Assert(resultSource.EnableGtid, check.Equals, source1.EnableGtid)
c.Assert(resultSource.SourceName, check.Equals, source1.SourceName)

// create source with same name will failed
source2 := source1
result2 := testutil.NewRequest().Post(baseURL).WithJsonBody(source2).Go(t.testT, s.echo)
// check http status code
c.Assert(result2.Code(), check.Equals, http.StatusBadRequest)
var errResp openapi.ErrorWithMessage
err = result2.UnmarshalBodyToObject(&errResp)
c.Assert(err, check.IsNil)
c.Assert(errResp.ErrorCode, check.Equals, int(terror.ErrSchedulerSourceCfgExist.Code()))

// list sources
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
result3 := testutil.NewRequest().Get(baseURL).Go(t.testT, s.echo)
// check http status code
c.Assert(result3.Code(), check.Equals, http.StatusOK)
var resultListSource openapi.GetSourceListResponse
err = result3.UnmarshalBodyToObject(&resultListSource)
c.Assert(err, check.IsNil)
c.Assert(resultListSource.Data, check.HasLen, 1)
c.Assert(resultListSource.Total, check.Equals, 1)
c.Assert(resultListSource.Data[0].SourceName, check.Equals, source1.SourceName)

// delete source
result4 := testutil.NewRequest().Delete(fmt.Sprintf("%s/%s", baseURL, source1.SourceName)).Go(t.testT, s.echo)
// check http status code
c.Assert(result4.Code(), check.Equals, http.StatusNoContent)

// delete again will failed
result5 := testutil.NewRequest().Delete(fmt.Sprintf("%s/%s", baseURL, source1.SourceName)).Go(t.testT, s.echo)
c.Assert(result5.Code(), check.Equals, http.StatusBadRequest)
var errResp2 openapi.ErrorWithMessage
err = result5.UnmarshalBodyToObject(&errResp2)
c.Assert(err, check.IsNil)
c.Assert(errResp2.ErrorCode, check.Equals, int(terror.ErrSchedulerSourceCfgNotExist.Code()))

// list sources
result6 := testutil.NewRequest().Get(baseURL).Go(t.testT, s.echo)
// check http status code
c.Assert(result6.Code(), check.Equals, http.StatusOK)
var resultListSource2 openapi.GetSourceListResponse
err = result6.UnmarshalBodyToObject(&resultListSource2)
c.Assert(err, check.IsNil)
c.Assert(resultListSource2.Data, check.HasLen, 0)
c.Assert(resultListSource2.Total, check.Equals, 0)
}
Loading