Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into rpc-process-tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored May 30, 2019
2 parents b731a87 + d314b5f commit 6ab4d6c
Show file tree
Hide file tree
Showing 17 changed files with 921 additions and 124 deletions.
6 changes: 4 additions & 2 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1865,7 +1865,9 @@ func (s *Server) waitOperationOk(ctx context.Context, cli workerrpc.Client, name
log.Errorf("fail to query task operation %v", err)
} else {
respLog := resp.QueryTaskOperation.Log
if respLog.Success {
if respLog == nil {
return errors.Errorf("operation %d of task %s not found, please execute `query-status` to check status", opLogID, name)
} else if respLog.Success {
return nil
} else if len(respLog.Message) != 0 {
return errors.New(respLog.Message)
Expand All @@ -1880,7 +1882,7 @@ func (s *Server) waitOperationOk(ctx context.Context, cli workerrpc.Client, name
}
}

return errors.New("request is timeout, but request may be successful")
return errors.New("request is timeout, but request may be successful, please execute `query-status` to check status")
}

func (s *Server) handleOperationResult(ctx context.Context, cli workerrpc.Client, name string, err error, resp *workerrpc.Response) *pb.OperateSubTaskResponse {
Expand Down
39 changes: 34 additions & 5 deletions dm/worker/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,49 @@
package worker

import (
"path"

. "github.com/pingcap/check"
)

func (t *testWorker) TestConfig(c *C) {
cfg := &Config{}
func (t *testServer) TestConfig(c *C) {
cfg := NewConfig()

err := cfg.configFromFile("./dm-worker.toml")
c.Assert(err, IsNil)
c.Assert(cfg.SourceID, Equals, "mysql-replica-01")
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml", "-relay-dir=./xx"}), IsNil)
c.Assert(cfg.RelayDir, Equals, "./xx")
c.Assert(cfg.ServerID, Equals, 101)

dir := c.MkDir()
cfg.ConfigFile = path.Join(dir, "dm-worker.toml")

// test clone
clone1 := cfg.Clone()
c.Assert(cfg, DeepEquals, clone1)
clone1.ServerID = 100
c.Assert(cfg.ServerID, Equals, 101)

// test format
c.Assert(cfg.String(), Matches, `.*"server-id":101.*`)
tomlStr, err := clone1.Toml()
c.Assert(err, IsNil)
c.Assert(tomlStr, Matches, `(.|\n)*server-id = 100(.|\n)*`)
originCfgStr, err := cfg.Toml()
c.Assert(err, IsNil)
c.Assert(originCfgStr, Matches, `(.|\n)*server-id = 101(.|\n)*`)

// test update config file and reload
c.Assert(cfg.UpdateConfigFile(tomlStr), IsNil)
cfg.Reload()
c.Assert(err, IsNil)
c.Assert(cfg.ServerID, Equals, 100)
c.Assert(cfg.UpdateConfigFile(originCfgStr), IsNil)
cfg.Reload()
c.Assert(err, IsNil)
c.Assert(cfg.ServerID, Equals, 101)

// test decrypt password
clone1.From.Password = "1234"
clone1.ServerID = 101
clone2, err := cfg.DecryptPassword()
c.Assert(err, IsNil)
c.Assert(clone2, DeepEquals, clone1)
Expand Down
28 changes: 28 additions & 0 deletions dm/worker/hub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package worker

import (
. "github.com/pingcap/check"
)

func (t *testServer) testConidtionHub(c *C, s *Server) {
// test condition hub
c.Assert(GetConditionHub(), NotNil)
c.Assert(GetConditionHub().w, DeepEquals, s.worker)

InitConditionHub(&Worker{})
c.Assert(GetConditionHub(), NotNil)
c.Assert(GetConditionHub().w, DeepEquals, s.worker)
}
14 changes: 10 additions & 4 deletions dm/worker/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ import (
// ErrInValidHandler indicates we meet an invalid Putter/Getter/Deleter
var ErrInValidHandler = errors.New("handler is nil, please pass a leveldb.DB or leveldb.Transaction")

var (
// GCBatchSize is batch size for gc process
GCBatchSize = 1024
// GCInterval is the interval to gc
GCInterval = time.Hour
)

// Putter is interface which has Put method
type Putter interface {
Put(key, value []byte, opts *opt.WriteOptions) error
Expand Down Expand Up @@ -279,13 +286,12 @@ func (logger *Logger) Append(db Putter, opLog *pb.TaskLog) error {

// GC deletes useless log
func (logger *Logger) GC(ctx context.Context, db *leveldb.DB) {
ticker := time.NewTicker(time.Hour)
ticker := time.NewTicker(GCInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Infof("[task log gc] goroutine exits!")
log.Infof("[task log gc] goroutine exist!")
return
case <-ticker.C:
var gcID int64
Expand Down Expand Up @@ -321,7 +327,7 @@ func (logger *Logger) doGC(db *leveldb.DB, id int64) {
}

batch.Delete(iter.Key())
if batch.Len() == 1024 {
if batch.Len() == GCBatchSize {
err := db.Write(batch, nil)
if err != nil {
log.Errorf("[task log gc] fail to delete keys from kv db %v", err)
Expand Down
22 changes: 13 additions & 9 deletions dm/worker/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,10 @@
package worker

import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/dm/dm/pb"
)

func TestLog(t *testing.T) {
TestingT(t)
}

type testLog struct{}

var _ = Suite(&testLog{})
Expand Down Expand Up @@ -222,22 +216,32 @@ func (t *testLog) TestTaskLogGC(c *C) {
c.Assert(db.Put(EncodeTaskLogKey(30), log2Bytes, nil), IsNil)

taskLog3 := &pb.TaskLog{
Id: 40,
Task: testTask1Meta,
Ts: 40,
}
log3Bytes, err := taskLog3.Marshal()
c.Assert(err, IsNil)
c.Assert(db.Put(EncodeTaskLogKey(40), log3Bytes, nil), IsNil)

taskLog4 := &pb.TaskLog{
Id: 60,
Task: testTask1Meta,
Ts: 60,
}
log3Bytes, err := taskLog3.Marshal()
log4Bytes, err := taskLog4.Marshal()
c.Assert(err, IsNil)
c.Assert(db.Put(EncodeTaskLogKey(60), log3Bytes, nil), IsNil)
c.Assert(db.Put(EncodeTaskLogKey(60), log4Bytes, nil), IsNil)

// forward
c.Assert(logger.ForwardTo(db, 59), IsNil)

// gc
GCBatchSize = 2
logger.doGC(db, 59)

logs, err := logger.Initial(db)
c.Assert(logs, DeepEquals, []*pb.TaskLog{taskLog3})
c.Assert(logs, DeepEquals, []*pb.TaskLog{taskLog4})
c.Assert(logger.handledPointer.Location, Equals, int64(59))
c.Assert(logger.endPointer.Location, Equals, int64(61))
}
Expand Down
5 changes: 0 additions & 5 deletions dm/worker/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"io/ioutil"
"os"
"path"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/dm/dm/config"
Expand All @@ -26,10 +25,6 @@ import (
"github.com/syndtr/goleveldb/leveldb"
)

func TestMeta(t *testing.T) {
TestingT(t)
}

var (
testTask1 = &config.SubTaskConfig{
Name: "task1",
Expand Down
Loading

0 comments on commit 6ab4d6c

Please sign in to comment.