Skip to content

Commit

Permalink
db(dm): use net.JoinHostPort to generate host-port part of URI (#6248)
Browse files Browse the repository at this point in the history
close #6249
  • Loading branch information
D3Hunter authored Jul 19, 2022
1 parent 3405b79 commit b4bbb86
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 68 deletions.
8 changes: 5 additions & 3 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
ticonfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
tidbkv "github.com/pingcap/tidb/kv"
timeta "github.com/pingcap/tidb/meta"
Expand All @@ -31,11 +32,12 @@ import (
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"

"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func TestSchema(t *testing.T) {
Expand Down Expand Up @@ -898,7 +900,7 @@ func getAllHistoryDDLJob(storage tidbkv.Storage) ([]*timodel.Job, error) {
defer txn.Rollback() //nolint:errcheck
txnMeta := timeta.NewMeta(txn)

jobs, err := txnMeta.GetAllHistoryDDLJobs()
jobs, err := ddl.GetAllHistoryDDLJobs(txnMeta)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
5 changes: 3 additions & 2 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

ticonfig "github.com/pingcap/tidb/config"
tiddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
timeta "github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewSchemaTestHelper(t *testing.T) *SchemaTestHelper {
// DDL2Job executes the DDL stmt and returns the DDL job
func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
s.tk.MustExec(ddl)
jobs, err := s.GetCurrentMeta().GetLastNHistoryDDLJobs(1)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1)
require.Nil(s.t, err)
require.Len(s.t, jobs, 1)
return jobs[0]
Expand All @@ -72,7 +73,7 @@ func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
// DDL statements.
func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job {
s.tk.MustExec(ddl)
jobs, err := s.GetCurrentMeta().GetLastNHistoryDDLJobs(jobCnt)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), jobCnt)
require.Nil(s.t, err)
require.Len(s.t, jobs, jobCnt)
return jobs
Expand Down
24 changes: 16 additions & 8 deletions cdc/redo/writer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,23 @@ package writer
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/pkg/fsutil"
"github.com/pingcap/tiflow/pkg/uuid"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"
)

func TestWriterWrite(t *testing.T) {
Expand Down Expand Up @@ -285,20 +287,26 @@ func TestNewWriter(t *testing.T) {
_, err := NewWriter(context.Background(), nil)
require.NotNil(t, err)

s3URI, err := url.Parse("s3://logbucket/test-changefeed?endpoint=http://111/")
require.Nil(t, err)

storageDir := t.TempDir()
dir := t.TempDir()

uuidGen := uuid.NewConstGenerator("const-uuid")
w, err := NewWriter(context.Background(), &FileWriterConfig{
Dir: "sdfsf",
S3Storage: true,
S3URI: *s3URI,
S3Storage: false,
},
WithUUIDGenerator(func() uuid.Generator { return uuidGen }),
)
require.Nil(t, err)
backend := &backuppb.StorageBackend{
Backend: &backuppb.StorageBackend_Local{Local: &backuppb.Local{Path: storageDir}},
}
localStorage, err := storage.New(context.Background(), backend, &storage.ExternalStorageOptions{
SendCredentials: false,
HTTPClient: nil,
})
w.storage = localStorage
require.Nil(t, err)
err = w.Close()
require.Nil(t, err)
require.False(t, w.IsRunning())
Expand Down
5 changes: 4 additions & 1 deletion dm/dm/master/openapi_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httputil"
"strconv"

ginmiddleware "github.com/deepmap/oapi-codegen/pkg/gin-middleware"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -119,7 +121,8 @@ func (s *Server) GetDocJSON(c *gin.Context) {
if useTLS.Load() {
protocol = "https"
}
masterURL = fmt.Sprintf("%s://%s:%d", protocol, masterTopos[0].Host, masterTopos[0].Port)
hostPort := net.JoinHostPort(masterTopos[0].Host, strconv.Itoa(masterTopos[0].Port))
masterURL = fmt.Sprintf("%s://%s", protocol, hostPort)
}
swagger, err := openapi.GetSwagger()
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions dm/pkg/conn/basedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"fmt"
"net"
"net/url"
"strconv"
"sync"
Expand Down Expand Up @@ -59,8 +60,9 @@ func init() {
func (d *DefaultDBProviderImpl) Apply(config *config.DBConfig) (*BaseDB, error) {
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
// https://github.com/go-sql-driver/mysql#maxallowedpacket
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=0",
config.User, config.Password, config.Host, config.Port)
hostPort := net.JoinHostPort(config.Host, strconv.Itoa(config.Port))
dsn := fmt.Sprintf("%s:%s@tcp(%s)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=0",
config.User, config.Password, hostPort)

doFuncInClose := func() {}
if config.Security != nil {
Expand Down
13 changes: 7 additions & 6 deletions dm/pkg/parser/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,14 @@ func FetchDDLTables(schema string, stmt ast.StmtNode, flavor utils.LowerCaseTabl
}

// special cases: schema related SQLs doesn't have tableName
// todo: pass .O or .L of table name depends on flavor
switch v := stmt.(type) {
case *ast.AlterDatabaseStmt:
return []*filter.Table{genTableName(v.Name, "")}, nil
return []*filter.Table{genTableName(v.Name.O, "")}, nil
case *ast.CreateDatabaseStmt:
return []*filter.Table{genTableName(v.Name, "")}, nil
return []*filter.Table{genTableName(v.Name.O, "")}, nil
case *ast.DropDatabaseStmt:
return []*filter.Table{genTableName(v.Name, "")}, nil
return []*filter.Table{genTableName(v.Name.O, "")}, nil
}

e := &tableNameExtractor{
Expand Down Expand Up @@ -167,11 +168,11 @@ func RenameDDLTable(stmt ast.StmtNode, targetTables []*filter.Table) (string, er

switch v := stmt.(type) {
case *ast.AlterDatabaseStmt:
v.Name = targetTables[0].Schema
v.Name = model.NewCIStr(targetTables[0].Schema)
case *ast.CreateDatabaseStmt:
v.Name = targetTables[0].Schema
v.Name = model.NewCIStr(targetTables[0].Schema)
case *ast.DropDatabaseStmt:
v.Name = targetTables[0].Schema
v.Name = model.NewCIStr(targetTables[0].Schema)
default:
visitor := &tableRenameVisitor{
targetNames: targetTables,
Expand Down
48 changes: 34 additions & 14 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (tr *Tracker) Init(
// TiDB will unconditionally create an empty "test" schema.
// This interferes with MySQL/MariaDB upstream which such schema does not
// exist by default. So we need to drop it first.
err = dom.DDL().DropSchema(se, model.NewCIStr("test"))
err = dropDatabase(dom, se, "test")
if err != nil {
return err
}
Expand Down Expand Up @@ -398,19 +398,26 @@ func IsTableNotExists(err error) bool {
func (tr *Tracker) Reset() error {
tr.se.SetValue(sessionctx.QueryString, "skip")
allDBs := tr.dom.InfoSchema().AllSchemaNames()
ddl := tr.dom.DDL()
for _, db := range allDBs {
dbName := model.NewCIStr(db)
if filter.IsSystemSchema(dbName.L) {
continue
}
if err := ddl.DropSchema(tr.se, dbName); err != nil {
if err := dropDatabase(tr.dom, tr.se, dbName.L); err != nil {
return err
}
}
return nil
}

func dropDatabase(dom *domain.Domain, se session.Session, db string) error {
stmt := &ast.DropDatabaseStmt{
Name: model.NewCIStr(db),
IfExists: true,
}
return dom.DDL().DropSchema(se, stmt)
}

// Close close a tracker.
func (tr *Tracker) Close() error {
if tr == nil {
Expand Down Expand Up @@ -442,21 +449,30 @@ func (tr *Tracker) Close() error {
// DropTable drops a table from this tracker.
func (tr *Tracker) DropTable(table *filter.Table) error {
tr.se.SetValue(sessionctx.QueryString, "skip")
tableIdent := ast.Ident{
Schema: model.NewCIStr(table.Schema),
Name: model.NewCIStr(table.Name),
}
return tr.dom.DDL().DropTable(tr.se, tableIdent)
stmt := &ast.DropTableStmt{
Tables: []*ast.TableName{
{
Schema: model.NewCIStr(table.Schema),
Name: model.NewCIStr(table.Name),
},
},
IfExists: true,
}
return tr.dom.DDL().DropTable(tr.se, stmt)
}

// DropIndex drops an index from this tracker.
func (tr *Tracker) DropIndex(table *filter.Table, index string) error {
tr.se.SetValue(sessionctx.QueryString, "skip")
tableIdent := ast.Ident{
Schema: model.NewCIStr(table.Schema),
Name: model.NewCIStr(table.Name),
}
return tr.dom.DDL().DropIndex(tr.se, tableIdent, model.NewCIStr(index), true)
stmt := &ast.DropIndexStmt{
Table: &ast.TableName{
Schema: model.NewCIStr(table.Schema),
Name: model.NewCIStr(table.Name),
},
IndexName: index,
IfExists: true,
}
return tr.dom.DDL().DropIndex(tr.se, stmt)
}

// CreateSchemaIfNotExists creates a SCHEMA of the given name if it did not exist.
Expand All @@ -466,7 +482,11 @@ func (tr *Tracker) CreateSchemaIfNotExists(db string) error {
if tr.dom.InfoSchema().SchemaExists(dbName) {
return nil
}
return tr.dom.DDL().CreateSchema(tr.se, dbName, nil, nil)
stmt := &ast.CreateDatabaseStmt{
Name: dbName,
IfNotExists: true,
}
return tr.dom.DDL().CreateSchema(tr.se, stmt)
}

// cloneTableInfo creates a clone of the TableInfo.
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/utils/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func ParseTimeZone(s string) (*time.Location, error) {
// The time zone's value should in [-12:59,+14:00].
// See: https://dev.mysql.com/doc/refman/8.0/en/time-zone-support.html#time-zone-variables
if strings.HasPrefix(s, "+") || strings.HasPrefix(s, "-") {
d, err := types.ParseDuration(nil, s[1:], 0)
d, _, err := types.ParseDuration(nil, s[1:], 0)
if err == nil {
if s[0] == '-' {
if d.Duration > 12*time.Hour+59*time.Minute {
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/s3_dumpling_lighting/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ db="s3_dumpling_lightning"
db1="s3_dumpling_lightning1"
tb="t"
tb1="t1"
S3_DIR="s3://dmbucket/dump?region=us-west-2\&endpoint=http://127.0.0.1:8688\&access_key=s3accesskey\&secret_access_key=s3secretkey\&force_path_style=true"
S3_DIR="s3://dmbucket/dump?region=us-east-1\&endpoint=http://127.0.0.1:8688\&access_key=s3accesskey\&secret_access_key=s3secretkey\&force_path_style=true"
LOCAL_TEST_DIR="./dumpdata"

# s3 config
Expand Down
19 changes: 9 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424
github.com/getkin/kin-openapi v0.80.0
github.com/gin-gonic/gin v1.7.4
github.com/go-mysql-org/go-mysql v1.4.1-0.20220221114137-89145541e0d4
github.com/go-mysql-org/go-mysql v1.6.1-0.20220718092400-c855c26b37bd
github.com/go-sql-driver/mysql v1.6.0
github.com/gogo/gateway v1.1.0
github.com/gogo/protobuf v1.3.2
Expand All @@ -51,11 +51,11 @@ require (
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167
github.com/pingcap/kvproto v0.0.0-20220705090230-a5d4ffd2ba33
github.com/pingcap/log v1.1.0
github.com/pingcap/tidb v1.1.0-beta.0.20220622125636-a2fe74fc92ed
github.com/pingcap/tidb v1.1.0-beta.0.20220713062705-50437e1d4087
github.com/pingcap/tidb-tools v6.0.1-0.20220516050036-b3ea358e374a+incompatible
github.com/pingcap/tidb/parser v0.0.0-20220622125636-a2fe74fc92ed
github.com/pingcap/tidb/parser v0.0.0-20220713065705-0e13d5d00990
github.com/prometheus/client_golang v1.12.2
github.com/prometheus/client_model v0.2.0
github.com/r3labs/diff v1.1.0
Expand All @@ -69,7 +69,7 @@ require (
github.com/swaggo/gin-swagger v1.2.0
github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/tikv/client-go/v2 v2.0.1-0.20220613112734-be31f33ba03b
github.com/tikv/client-go/v2 v2.0.1-0.20220627063500-947d923945fd
github.com/tikv/pd v1.1.0-beta.0.20220303060546-3695d8164800
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
github.com/tinylib/msgp v1.1.6
Expand All @@ -88,13 +88,13 @@ require (
go.uber.org/multierr v1.8.0
go.uber.org/ratelimit v0.2.0
go.uber.org/zap v1.21.0
golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5
golang.org/x/exp v0.0.0-20220428152302-39d4317da171
golang.org/x/net v0.0.0-20220516155154-20f960328961
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a
golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664
golang.org/x/text v0.3.7
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
golang.org/x/tools v0.1.10 // indirect
golang.org/x/tools v0.1.11 // indirect
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3
google.golang.org/grpc v1.46.2
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -218,7 +218,7 @@ require (
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 // indirect
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 // indirect
github.com/pingcap/tipb v0.0.0-20220602075447-4847c5d68e73 // indirect
github.com/pingcap/tipb v0.0.0-20220706024432-7be3cc83a7d5 // indirect
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -229,7 +229,6 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/shirou/gopsutil/v3 v3.22.4 // indirect
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect
Expand Down
Loading

0 comments on commit b4bbb86

Please sign in to comment.