Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Improve performence (#1170) #1203

Merged
merged 2 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 83 additions & 83 deletions dm/dm-ansible/scripts/dm.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ func extractSources(s *Server, req hasWokers) ([]string, error) {
// query specified task's sources
sources = s.getTaskResources(req.GetName())
if len(sources) == 0 {
return nil, errors.Errorf("task %s has no source or not exist, can try `refresh-worker-tasks` cmd first", req.GetName())
return nil, errors.Errorf("task %s has no source or not exist", req.GetName())
}
} else {
// query all sources
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (t *testMaster) TestQueryStatus(c *check.C) {
})
c.Assert(err, check.IsNil)
c.Assert(resp.Result, check.IsFalse)
c.Assert(resp.Msg, check.Matches, "task .* has no source or not exist, can try `refresh-worker-tasks` cmd first")
c.Assert(resp.Msg, check.Matches, "task .* has no source or not exist")
clearSchedulerEnv(c, cancel, &wg)
// TODO: test query with correct task name, this needs to add task first
}
Expand Down
57 changes: 40 additions & 17 deletions pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strconv"
"strings"

"github.com/siddontang/go-mysql/mysql"
gmysql "github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"

Expand Down Expand Up @@ -206,10 +207,11 @@ func ComparePosition(pos1, pos2 gmysql.Position) int {
}

// Location is used for save binlog's position and gtid
// TODO: encapsulate all attributes in Location
type Location struct {
Position gmysql.Position

GTIDSet gtid.Set
gtidSet gtid.Set

Suffix int // use for replace event
}
Expand All @@ -218,7 +220,15 @@ type Location struct {
func NewLocation(flavor string) Location {
return Location{
Position: MinPosition,
GTIDSet: gtid.MinGTIDSet(flavor),
gtidSet: gtid.MinGTIDSet(flavor),
}
}

// InitLocation init a new Location
func InitLocation(pos gmysql.Position, gset gtid.Set) Location {
return Location{
Position: pos,
gtidSet: gset,
}
}

Expand All @@ -232,8 +242,8 @@ func (l Location) String() string {
// GTIDSetStr returns gtid set's string
func (l Location) GTIDSetStr() string {
gsetStr := ""
if l.GTIDSet != nil {
gsetStr = l.GTIDSet.String()
if l.gtidSet != nil {
gsetStr = l.gtidSet.String()
}

return gsetStr
Expand All @@ -244,20 +254,11 @@ func (l Location) Clone() Location {
return l.CloneWithFlavor("")
}

// ClonePtr clones a same Location pointer
func (l *Location) ClonePtr() *Location {
if l == nil {
return nil
}
newLocation := l.Clone()
return &newLocation
}

// CloneWithFlavor clones the location, and if the GTIDSet is nil, will create a GTIDSet with specified flavor.
func (l Location) CloneWithFlavor(flavor string) Location {
var newGTIDSet gtid.Set
if l.GTIDSet != nil {
newGTIDSet = l.GTIDSet.Clone()
if l.gtidSet != nil {
newGTIDSet = l.gtidSet.Clone()
} else if len(flavor) != 0 {
newGTIDSet = gtid.MinGTIDSet(flavor)
}
Expand All @@ -267,7 +268,7 @@ func (l Location) CloneWithFlavor(flavor string) Location {
Name: l.Position.Name,
Pos: l.Position.Pos,
},
GTIDSet: newGTIDSet,
gtidSet: newGTIDSet,
Suffix: l.Suffix,
}
}
Expand All @@ -278,7 +279,7 @@ func (l Location) CloneWithFlavor(flavor string) Location {
// -1 if point1 is less than point2
func CompareLocation(location1, location2 Location, cmpGTID bool) int {
if cmpGTID {
cmp, canCmp := CompareGTID(location1.GTIDSet, location2.GTIDSet)
cmp, canCmp := CompareGTID(location1.gtidSet, location2.gtidSet)
if canCmp {
if cmp != 0 {
return cmp
Expand Down Expand Up @@ -346,3 +347,25 @@ func compareIndex(lhs, rhs int) int {
func (l *Location) ResetSuffix() {
l.Suffix = 0
}

// SetGTID set new gtid for location
// Use this func instead of GITSet.Set to avoid change other location
func (l *Location) SetGTID(gset mysql.GTIDSet) error {
flavor := mysql.MySQLFlavor
if _, ok := l.gtidSet.(*gtid.MariadbGTIDSet); ok {
flavor = mysql.MariaDBFlavor
}

newGTID := gtid.MinGTIDSet(flavor)
if err := newGTID.Set(gset); err != nil {
return err
}

l.gtidSet = newGTID
return nil
}

// GetGTID return gtidSet of Location
func (l *Location) GetGTID() gtid.Set {
return l.gtidSet
}
42 changes: 42 additions & 0 deletions pkg/binlog/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,3 +723,45 @@ func (t *testPositionSuite) TestVerifyBinlogPos(c *C) {
}
}
}

func (t *testPositionSuite) TestSetGTID(c *C) {
GTIDSetStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14"
GTIDSetStr2 := "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-15"
gset, _ := gtid.ParserGTID("mysql", GTIDSetStr)
gset2, _ := gtid.ParserGTID("mysql", GTIDSetStr2)
mysqlSet := gset.Origin()
mysqlSet2 := gset2.Origin()

loc := Location{
Position: gmysql.Position{
Name: "mysql-bin.00002",
Pos: 2333,
},
gtidSet: gset,
Suffix: 0,
}
loc2 := loc

c.Assert(CompareLocation(loc, loc2, false), Equals, 0)

loc2.Position.Pos++
c.Assert(loc.Position.Pos, Equals, uint32(2333))
c.Assert(CompareLocation(loc, loc2, false), Equals, -1)

loc2.Position.Name = "mysql-bin.00001"
c.Assert(loc.Position.Name, Equals, "mysql-bin.00002")
c.Assert(CompareLocation(loc, loc2, false), Equals, 1)

// WARN: will change other location's gtid
err := loc2.gtidSet.Set(mysqlSet2)
c.Assert(err, IsNil)
c.Assert(loc.gtidSet.String(), Equals, GTIDSetStr2)
c.Assert(loc2.gtidSet.String(), Equals, GTIDSetStr2)
c.Assert(CompareLocation(loc, loc2, true), Equals, 0)

err = loc2.SetGTID(mysqlSet)
c.Assert(err, IsNil)
c.Assert(loc.gtidSet.String(), Equals, GTIDSetStr2)
c.Assert(loc2.gtidSet.String(), Equals, GTIDSetStr)
c.Assert(CompareLocation(loc, loc2, true), Equals, 1)
}
9 changes: 6 additions & 3 deletions pkg/conn/baseconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,12 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *me
arg = args[i]
}

tctx.L().Debug("execute statement",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)))
// avoid use TruncateInterface for all log level which will slow the speed of DML
if tctx.L().Core().Enabled(zap.DebugLevel) {
tctx.L().Debug("execute statement",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)))
}

startTime = time.Now()
_, err = txn.ExecContext(tctx.Context(), query, arg...)
Expand Down
18 changes: 7 additions & 11 deletions pkg/dumpling/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location,
pos2 mysql.Position
gtidStr2 string

loc *binlog.Location
loc2 *binlog.Location
locPtr *binlog.Location
locPtr2 *binlog.Location
)

br := bufio.NewReader(fd)
Expand Down Expand Up @@ -135,10 +135,8 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location,
if err != nil {
return nil, nil, invalidErr
}
loc = &binlog.Location{
Position: pos,
GTIDSet: gset,
}
loc := binlog.InitLocation(pos, gset)
locPtr = &loc

if useLocation2 {
if len(pos2.Name) == 0 || pos2.Pos == uint32(0) {
Expand All @@ -148,13 +146,11 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location,
if err != nil {
return nil, nil, invalidErr
}
loc2 = &binlog.Location{
Position: pos2,
GTIDSet: gset2,
}
loc2 := binlog.InitLocation(pos2, gset2)
locPtr2 = &loc2
}

return loc, loc2, nil
return locPtr, locPtr2, nil
}

func readFollowingGTIDs(br *bufio.Reader, flavor string) (string, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/dumpling/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ Finished dump at: 2020-09-30 12:16:49
c.Assert(err, IsNil)
c.Assert(loc.Position, DeepEquals, tc.pos)
gs, _ := gtid.ParserGTID("mysql", tc.gsetStr)
c.Assert(loc.GTIDSet, DeepEquals, gs)
c.Assert(loc.GetGTID(), DeepEquals, gs)
if tc.loc2 {
c.Assert(loc2.Position, DeepEquals, tc.pos2)
gs2, _ := gtid.ParserGTID("mysql", tc.gsetStr2)
c.Assert(loc2.GTIDSet, DeepEquals, gs2)
c.Assert(loc2.GetGTID(), DeepEquals, gs2)
} else {
c.Assert(loc2, IsNil)
}
Expand Down
47 changes: 30 additions & 17 deletions pkg/metricsproxy/countervec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@ import (
type CounterVecProxy struct {
mu sync.Mutex

LabelNames []string
Labels map[string]map[string]string
LabelNamesIndex map[string]int
Labels map[string][]string
*prometheus.CounterVec
}

// NewCounterVec creates a new CounterVec based on the provided CounterOpts and
// partitioned by the given label names.
func NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *CounterVecProxy {
return &CounterVecProxy{
LabelNames: labelNames,
Labels: make(map[string]map[string]string),
CounterVec: prometheus.NewCounterVec(opts, labelNames),
counterVecProxy := &CounterVecProxy{
LabelNamesIndex: make(map[string]int, len(labelNames)),
Labels: make(map[string][]string),
CounterVec: prometheus.NewCounterVec(opts, labelNames),
}
for idx, v := range labelNames {
counterVecProxy.LabelNamesIndex[v] = idx
}
return counterVecProxy
}

// WithLabelValues works as GetMetricWithLabelValues, but panics where
Expand All @@ -44,13 +48,7 @@ func NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *CounterVec
// myVec.WithLabelValues("404", "GET").Add(42)
func (c *CounterVecProxy) WithLabelValues(lvs ...string) prometheus.Counter {
if len(lvs) > 0 {
labels := make(map[string]string, len(lvs))
for index, label := range lvs {
labels[c.LabelNames[index]] = label
}
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
noteLabelsInMetricsProxy(c, lvs)
}
return c.CounterVec.WithLabelValues(lvs...)
}
Expand All @@ -60,9 +58,12 @@ func (c *CounterVecProxy) WithLabelValues(lvs ...string) prometheus.Counter {
// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42)
func (c *CounterVecProxy) With(labels prometheus.Labels) prometheus.Counter {
if len(labels) > 0 {
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
values := make([]string, len(labels))
labelNameIndex := c.GetLabelNamesIndex()
for k, v := range labels {
values[labelNameIndex[k]] = v
}
noteLabelsInMetricsProxy(c, values)
}

return c.CounterVec.With(labels)
Expand All @@ -78,11 +79,23 @@ func (c *CounterVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool {
return findAndDeleteLabelsInMetricsProxy(c, labels)
}

// GetLabelNamesIndex to support get CounterVecProxy's LabelNames when you use Proxy object
func (c *CounterVecProxy) GetLabelNamesIndex() map[string]int {
return c.LabelNamesIndex
}

// GetLabels to support get CounterVecProxy's Labels when you use Proxy object
func (c *CounterVecProxy) GetLabels() map[string]map[string]string {
func (c *CounterVecProxy) GetLabels() map[string][]string {
return c.Labels
}

// SetLabel to support set CounterVecProxy's Label when you use Proxy object
func (c *CounterVecProxy) SetLabel(key string, vals []string) {
c.mu.Lock()
defer c.mu.Unlock()
c.Labels[key] = vals
}

// vecDelete to support delete CounterVecProxy's Labels when you use Proxy object
func (c *CounterVecProxy) vecDelete(labels prometheus.Labels) bool {
return c.CounterVec.Delete(labels)
Expand Down
Loading