Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db(dm): use net.JoinHostPort to generate host-port part of URI #6248

Merged
merged 17 commits into from
Jul 19, 2022
Merged
8 changes: 5 additions & 3 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
ticonfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
tidbkv "github.com/pingcap/tidb/kv"
timeta "github.com/pingcap/tidb/meta"
Expand All @@ -31,11 +32,12 @@ import (
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"

"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func TestSchema(t *testing.T) {
Expand Down Expand Up @@ -898,7 +900,7 @@ func getAllHistoryDDLJob(storage tidbkv.Storage) ([]*timodel.Job, error) {
defer txn.Rollback() //nolint:errcheck
txnMeta := timeta.NewMeta(txn)

jobs, err := txnMeta.GetAllHistoryDDLJobs()
jobs, err := ddl.GetAllHistoryDDLJobs(txnMeta)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
5 changes: 3 additions & 2 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

ticonfig "github.com/pingcap/tidb/config"
tiddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
timeta "github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewSchemaTestHelper(t *testing.T) *SchemaTestHelper {
// DDL2Job executes the DDL stmt and returns the DDL job
func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
s.tk.MustExec(ddl)
jobs, err := s.GetCurrentMeta().GetLastNHistoryDDLJobs(1)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems more changes are introduced in this PR. Are they related to the IPv6 issue or they are fixed here simply because they violate the integration tests or some other stuffs due to some upgrades.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because dependency is updated

require.Nil(s.t, err)
require.Len(s.t, jobs, 1)
return jobs[0]
Expand All @@ -72,7 +73,7 @@ func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
// DDL statements.
func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job {
s.tk.MustExec(ddl)
jobs, err := s.GetCurrentMeta().GetLastNHistoryDDLJobs(jobCnt)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), jobCnt)
require.Nil(s.t, err)
require.Len(s.t, jobs, jobCnt)
return jobs
Expand Down
24 changes: 16 additions & 8 deletions cdc/redo/writer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,23 @@ package writer
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/pkg/fsutil"
"github.com/pingcap/tiflow/pkg/uuid"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"
)

func TestWriterWrite(t *testing.T) {
Expand Down Expand Up @@ -285,20 +287,26 @@ func TestNewWriter(t *testing.T) {
_, err := NewWriter(context.Background(), nil)
require.NotNil(t, err)

s3URI, err := url.Parse("s3://logbucket/test-changefeed?endpoint=http://111/")
require.Nil(t, err)

storageDir := t.TempDir()
dir := t.TempDir()

uuidGen := uuid.NewConstGenerator("const-uuid")
w, err := NewWriter(context.Background(), &FileWriterConfig{
Dir: "sdfsf",
S3Storage: true,
S3URI: *s3URI,
S3Storage: false,
},
WithUUIDGenerator(func() uuid.Generator { return uuidGen }),
)
require.Nil(t, err)
backend := &backuppb.StorageBackend{
Backend: &backuppb.StorageBackend_Local{Local: &backuppb.Local{Path: storageDir}},
}
localStorage, err := storage.New(context.Background(), backend, &storage.ExternalStorageOptions{
SendCredentials: false,
HTTPClient: nil,
})
w.storage = localStorage
require.Nil(t, err)
err = w.Close()
require.Nil(t, err)
require.False(t, w.IsRunning())
Expand Down
5 changes: 4 additions & 1 deletion dm/dm/master/openapi_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httputil"
"strconv"

ginmiddleware "github.com/deepmap/oapi-codegen/pkg/gin-middleware"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -119,7 +121,8 @@ func (s *Server) GetDocJSON(c *gin.Context) {
if useTLS.Load() {
protocol = "https"
}
masterURL = fmt.Sprintf("%s://%s:%d", protocol, masterTopos[0].Host, masterTopos[0].Port)
hostPort := net.JoinHostPort(masterTopos[0].Host, strconv.Itoa(masterTopos[0].Port))
masterURL = fmt.Sprintf("%s://%s", protocol, hostPort)
}
swagger, err := openapi.GetSwagger()
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions dm/pkg/conn/basedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"fmt"
"net"
"net/url"
"strconv"
"sync"
Expand Down Expand Up @@ -59,8 +60,9 @@ func init() {
func (d *DefaultDBProviderImpl) Apply(config *config.DBConfig) (*BaseDB, error) {
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
// https://github.com/go-sql-driver/mysql#maxallowedpacket
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=0",
config.User, config.Password, config.Host, config.Port)
hostPort := net.JoinHostPort(config.Host, strconv.Itoa(config.Port))
dsn := fmt.Sprintf("%s:%s@tcp(%s)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=0",
config.User, config.Password, hostPort)

doFuncInClose := func() {}
if config.Security != nil {
Expand Down
13 changes: 7 additions & 6 deletions dm/pkg/parser/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,14 @@ func FetchDDLTables(schema string, stmt ast.StmtNode, flavor utils.LowerCaseTabl
}

// special cases: schema related SQLs doesn't have tableName
// todo: pass .O or .L of table name depends on flavor
switch v := stmt.(type) {
case *ast.AlterDatabaseStmt:
return []*filter.Table{genTableName(v.Name, "")}, nil
return []*filter.Table{genTableName(v.Name.O, "")}, nil
case *ast.CreateDatabaseStmt:
return []*filter.Table{genTableName(v.Name, "")}, nil
return []*filter.Table{genTableName(v.Name.O, "")}, nil
case *ast.DropDatabaseStmt:
return []*filter.Table{genTableName(v.Name, "")}, nil
return []*filter.Table{genTableName(v.Name.O, "")}, nil
}

e := &tableNameExtractor{
Expand Down Expand Up @@ -167,11 +168,11 @@ func RenameDDLTable(stmt ast.StmtNode, targetTables []*filter.Table) (string, er

switch v := stmt.(type) {
case *ast.AlterDatabaseStmt:
v.Name = targetTables[0].Schema
v.Name = model.NewCIStr(targetTables[0].Schema)
case *ast.CreateDatabaseStmt:
v.Name = targetTables[0].Schema
v.Name = model.NewCIStr(targetTables[0].Schema)
case *ast.DropDatabaseStmt:
v.Name = targetTables[0].Schema
v.Name = model.NewCIStr(targetTables[0].Schema)
default:
visitor := &tableRenameVisitor{
targetNames: targetTables,
Expand Down
48 changes: 34 additions & 14 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (tr *Tracker) Init(
// TiDB will unconditionally create an empty "test" schema.
// This interferes with MySQL/MariaDB upstream which such schema does not
// exist by default. So we need to drop it first.
err = dom.DDL().DropSchema(se, model.NewCIStr("test"))
err = dropDatabase(dom, se, "test")
if err != nil {
return err
}
Expand Down Expand Up @@ -398,19 +398,26 @@ func IsTableNotExists(err error) bool {
func (tr *Tracker) Reset() error {
tr.se.SetValue(sessionctx.QueryString, "skip")
allDBs := tr.dom.InfoSchema().AllSchemaNames()
ddl := tr.dom.DDL()
for _, db := range allDBs {
dbName := model.NewCIStr(db)
if filter.IsSystemSchema(dbName.L) {
continue
}
if err := ddl.DropSchema(tr.se, dbName); err != nil {
if err := dropDatabase(tr.dom, tr.se, dbName.L); err != nil {
return err
}
}
return nil
}

func dropDatabase(dom *domain.Domain, se session.Session, db string) error {
stmt := &ast.DropDatabaseStmt{
Name: model.NewCIStr(db),
IfExists: true,
}
return dom.DDL().DropSchema(se, stmt)
}

// Close close a tracker.
func (tr *Tracker) Close() error {
if tr == nil {
Expand Down Expand Up @@ -442,21 +449,30 @@ func (tr *Tracker) Close() error {
// DropTable drops a table from this tracker.
func (tr *Tracker) DropTable(table *filter.Table) error {
tr.se.SetValue(sessionctx.QueryString, "skip")
tableIdent := ast.Ident{
Schema: model.NewCIStr(table.Schema),
Name: model.NewCIStr(table.Name),
}
return tr.dom.DDL().DropTable(tr.se, tableIdent)
stmt := &ast.DropTableStmt{
Tables: []*ast.TableName{
{
Schema: model.NewCIStr(table.Schema),
Name: model.NewCIStr(table.Name),
},
},
IfExists: true,
}
return tr.dom.DDL().DropTable(tr.se, stmt)
}

// DropIndex drops an index from this tracker.
func (tr *Tracker) DropIndex(table *filter.Table, index string) error {
tr.se.SetValue(sessionctx.QueryString, "skip")
tableIdent := ast.Ident{
Schema: model.NewCIStr(table.Schema),
Name: model.NewCIStr(table.Name),
}
return tr.dom.DDL().DropIndex(tr.se, tableIdent, model.NewCIStr(index), true)
stmt := &ast.DropIndexStmt{
Table: &ast.TableName{
Schema: model.NewCIStr(table.Schema),
Name: model.NewCIStr(table.Name),
},
IndexName: index,
IfExists: true,
}
return tr.dom.DDL().DropIndex(tr.se, stmt)
}

// CreateSchemaIfNotExists creates a SCHEMA of the given name if it did not exist.
Expand All @@ -466,7 +482,11 @@ func (tr *Tracker) CreateSchemaIfNotExists(db string) error {
if tr.dom.InfoSchema().SchemaExists(dbName) {
return nil
}
return tr.dom.DDL().CreateSchema(tr.se, dbName, nil, nil)
stmt := &ast.CreateDatabaseStmt{
Name: dbName,
IfNotExists: true,
}
return tr.dom.DDL().CreateSchema(tr.se, stmt)
}

// cloneTableInfo creates a clone of the TableInfo.
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/utils/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func ParseTimeZone(s string) (*time.Location, error) {
// The time zone's value should in [-12:59,+14:00].
// See: https://dev.mysql.com/doc/refman/8.0/en/time-zone-support.html#time-zone-variables
if strings.HasPrefix(s, "+") || strings.HasPrefix(s, "-") {
d, err := types.ParseDuration(nil, s[1:], 0)
d, _, err := types.ParseDuration(nil, s[1:], 0)
if err == nil {
if s[0] == '-' {
if d.Duration > 12*time.Hour+59*time.Minute {
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/s3_dumpling_lighting/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ db="s3_dumpling_lightning"
db1="s3_dumpling_lightning1"
tb="t"
tb1="t1"
S3_DIR="s3://dmbucket/dump?region=us-west-2\&endpoint=http://127.0.0.1:8688\&access_key=s3accesskey\&secret_access_key=s3secretkey\&force_path_style=true"
S3_DIR="s3://dmbucket/dump?region=us-east-1\&endpoint=http://127.0.0.1:8688\&access_key=s3accesskey\&secret_access_key=s3secretkey\&force_path_style=true"
LOCAL_TEST_DIR="./dumpdata"

# s3 config
Expand Down
19 changes: 9 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424
github.com/getkin/kin-openapi v0.80.0
github.com/gin-gonic/gin v1.7.4
github.com/go-mysql-org/go-mysql v1.4.1-0.20220221114137-89145541e0d4
github.com/go-mysql-org/go-mysql v1.6.1-0.20220718092400-c855c26b37bd
github.com/go-sql-driver/mysql v1.6.0
github.com/gogo/gateway v1.1.0
github.com/gogo/protobuf v1.3.2
Expand All @@ -51,11 +51,11 @@ require (
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167
github.com/pingcap/kvproto v0.0.0-20220705090230-a5d4ffd2ba33
github.com/pingcap/log v1.1.0
github.com/pingcap/tidb v1.1.0-beta.0.20220622125636-a2fe74fc92ed
github.com/pingcap/tidb v1.1.0-beta.0.20220713062705-50437e1d4087
github.com/pingcap/tidb-tools v6.0.1-0.20220516050036-b3ea358e374a+incompatible
github.com/pingcap/tidb/parser v0.0.0-20220622125636-a2fe74fc92ed
github.com/pingcap/tidb/parser v0.0.0-20220713065705-0e13d5d00990
github.com/prometheus/client_golang v1.12.2
github.com/prometheus/client_model v0.2.0
github.com/r3labs/diff v1.1.0
Expand All @@ -69,7 +69,7 @@ require (
github.com/swaggo/gin-swagger v1.2.0
github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/tikv/client-go/v2 v2.0.1-0.20220613112734-be31f33ba03b
github.com/tikv/client-go/v2 v2.0.1-0.20220627063500-947d923945fd
github.com/tikv/pd v1.1.0-beta.0.20220303060546-3695d8164800
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
github.com/tinylib/msgp v1.1.6
Expand All @@ -88,13 +88,13 @@ require (
go.uber.org/multierr v1.8.0
go.uber.org/ratelimit v0.2.0
go.uber.org/zap v1.21.0
golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5
golang.org/x/exp v0.0.0-20220428152302-39d4317da171
golang.org/x/net v0.0.0-20220516155154-20f960328961
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a
golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664
golang.org/x/text v0.3.7
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
golang.org/x/tools v0.1.10 // indirect
golang.org/x/tools v0.1.11 // indirect
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3
google.golang.org/grpc v1.46.2
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -218,7 +218,7 @@ require (
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 // indirect
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 // indirect
github.com/pingcap/tipb v0.0.0-20220602075447-4847c5d68e73 // indirect
github.com/pingcap/tipb v0.0.0-20220706024432-7be3cc83a7d5 // indirect
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -229,7 +229,6 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/shirou/gopsutil/v3 v3.22.4 // indirect
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect
Expand Down
Loading