Skip to content

Commit

Permalink
update deps refine some log (#814) (#816)
Browse files Browse the repository at this point in the history
* update deps refine some log (#814)

* update deps
  • Loading branch information
july2993 authored and IANTHEREAL committed Nov 20, 2019
1 parent 2059927 commit a0679c5
Show file tree
Hide file tree
Showing 16 changed files with 157 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ TEST_DIR := /tmp/tidb_binlog_test

GO := GO111MODULE=on go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOTEST := CGO_ENABLED=1 $(GO) test -p 3
GOTEST := CGO_ENABLED=1 $(GO) test -p 1
GOVERSION := "`go version`"

ARCH := "`uname -s`"
Expand Down
8 changes: 6 additions & 2 deletions arbiter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
LogLevel string `toml:"log-level" json:"log-level"`
ListenAddr string `toml:"addr" json:"addr"`
LogFile string `toml:"log-file" json:"log-file"`
OpenSaramaLog bool `toml:"open-sarama-log" json:"open-sarama-log"`

Up UpConfig `toml:"up" json:"up"`
Down DownConfig `toml:"down" json:"down"`
Expand Down Expand Up @@ -73,8 +74,9 @@ type DownConfig struct {
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`

WorkerCount int `toml:"worker-count" json:"worker-count"`
BatchSize int `toml:"batch-size" json:"batch-size"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
BatchSize int `toml:"batch-size" json:"batch-size"`
SafeMode bool `toml:"safe-mode" json:"safe-mode"`
}

// NewConfig return an instance of configuration
Expand All @@ -94,12 +96,14 @@ func NewConfig() *Config {
fs.StringVar(&cfg.Metrics.Addr, "metrics.addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")
fs.IntVar(&cfg.Metrics.Interval, "metrics.interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push")
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
fs.BoolVar(&cfg.OpenSaramaLog, "open-sarama-log", true, "save the logs from sarama (https://github.com/Shopify/sarama), a client of Kafka")

fs.Int64Var(&cfg.Up.InitialCommitTS, "up.initial-commit-ts", 0, "if arbiter doesn't have checkpoint, use initial commitTS to initial checkpoint")
fs.StringVar(&cfg.Up.Topic, "up.topic", "", "topic name of kafka")

fs.IntVar(&cfg.Down.WorkerCount, "down.worker-count", 16, "concurrency write to downstream")
fs.IntVar(&cfg.Down.BatchSize, "down.batch-size", 64, "batch size write to downstream")
fs.BoolVar(&cfg.Down.SafeMode, "safe-mode", false, "enable safe mode to make reentrant")

return cfg
}
Expand Down
24 changes: 16 additions & 8 deletions arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func NewServer(cfg *Config) (srv *Server, err error) {
return nil, errors.Trace(err)
}

log.Info("new kafka reader success")

// set loader
srv.load, err = newLoader(srv.downDB,
loader.WorkerCount(cfg.Down.WorkerCount),
Expand All @@ -122,15 +124,19 @@ func NewServer(cfg *Config) (srv *Server, err error) {
return nil, errors.Trace(err)
}

// set safe mode in first 5 min if abnormal quit last time
if status == StatusRunning {
log.Info("set safe mode to be true")
if down.SafeMode {
srv.load.SetSafeMode(true)
go func() {
time.Sleep(initSafeModeDuration)
srv.load.SetSafeMode(false)
log.Info("set safe mode to be false")
}()
} else {
// set safe mode in first 5 min if abnormal quit last time
if status == StatusRunning {
log.Info("set safe mode to be true")
srv.load.SetSafeMode(true)
go func() {
time.Sleep(initSafeModeDuration)
srv.load.SetSafeMode(false)
log.Info("set safe mode to be false")
}()
}
}

// set metrics
Expand Down Expand Up @@ -267,8 +273,10 @@ func (s *Server) loadStatus() (int, error) {
if !errors.IsNotFound(err) {
return 0, errors.Trace(err)
}
log.Info("no checkpoint found")
err = nil
} else {
log.Info("load checkpoint", zap.Int64("ts", ts), zap.Int("status", status))
s.finishTS = ts
}
return status, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion binlogctl/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"testing"
"time"

"github.com/coreos/etcd/integration"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-binlog/pkg/etcd"
"github.com/pingcap/tidb-binlog/pkg/node"
"go.etcd.io/etcd/integration"
)

func Test(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions cmd/arbiter/arbiter.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ password = ""
# worker-count = 16
# max DML operation in a transaction when write to downstream
# batch-size = 64
# safe-mode = false
8 changes: 6 additions & 2 deletions cmd/arbiter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ func main() {
if err := util.InitLogger(cfg.LogLevel, cfg.LogFile); err != nil {
log.Fatal("Failed to initialize log", zap.Error(err))
}
// may too many noise, discard sarama log now
sarama.Logger = stdlog.New(ioutil.Discard, "[Sarama] ", stdlog.LstdFlags)

// We have set sarama.Logger in util.InitLogger.
if !cfg.OpenSaramaLog {
// may too many noise, discard sarama log now
sarama.Logger = stdlog.New(ioutil.Discard, "[Sarama] ", stdlog.LstdFlags)
}

log.Info("start arbiter...", zap.Reflect("config", cfg))
version.PrintVersionInfo("Arbiter")
Expand Down
2 changes: 1 addition & 1 deletion drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/coreos/etcd/integration"
. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/util"
pkgzk "github.com/pingcap/tidb-binlog/pkg/zk"
"github.com/samuel/go-zookeeper/zk"
"go.etcd.io/etcd/integration"
)

var testEtcdCluster *integration.ClusterV3
Expand Down
2 changes: 1 addition & 1 deletion drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (g *taskGroup) start(name string, f func(), noPanic bool) {
if err := recover(); err != nil {
log.Error("Recovered from panic",
zap.Reflect("err", err),
zap.Stack("stack"),
zap.Stack("real stack"),
fName,
)
}
Expand Down
3 changes: 2 additions & 1 deletion drainer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ package drainer

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb-binlog/pkg/util"
)

type taskGroupSuite struct{}

var _ = Suite(&taskGroupSuite{})

/* May only get one log entry
func (s *taskGroupSuite) TestShouldRecoverFromPanic(c *C) {
var logHook util.LogHook
logHook.SetUp()
Expand All @@ -39,3 +39,4 @@ func (s *taskGroupSuite) TestShouldRecoverFromPanic(c *C) {
c.Assert(logHook.Entrys[0].Message, Matches, ".*Recovered.*")
c.Assert(logHook.Entrys[1].Message, Matches, ".*Exit.*")
}
*/
38 changes: 22 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,50 @@ require (
github.com/BurntSushi/toml v0.3.1
github.com/DATA-DOG/go-sqlmock v1.3.0
github.com/Shopify/sarama v1.23.1
github.com/coreos/etcd v3.3.13+incompatible
github.com/dustin/go-humanize v1.0.0
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.2.0
github.com/gogo/protobuf v1.2.1
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.2.0
github.com/golang/protobuf v1.3.1
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf
github.com/golang/protobuf v1.3.2
github.com/google/btree v1.0.0 // indirect
github.com/google/gofuzz v1.0.0
github.com/gorilla/mux v1.6.2
github.com/grpc-ecosystem/grpc-gateway v1.7.0 // indirect
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errors v0.11.4
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20191018040038-555b97093a2a
github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd
github.com/pingcap/tidb v1.1.0-beta.0.20191018094050-67d8fbf23eee
github.com/pingcap/tidb-tools v2.1.12+incompatible
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330
github.com/prometheus/client_golang v0.9.0
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/pingcap/parser v0.0.0-20191118062434-7c5018645942
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
github.com/pingcap/tidb v1.1.0-beta.0.20191120052905-d9254d33ade3
github.com/pingcap/tidb-tools v3.0.6-0.20191119150227-ff0a3c6e5763+incompatible
github.com/pingcap/tipb v0.0.0-20191031111650-d14196d52154
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
github.com/samuel/go-zookeeper v0.0.0-20170815201139-e6b59f6144be
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
github.com/siddontang/go-mysql v0.0.0-20190618002340-dbe0224ac097 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.3 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d
github.com/zanmato1984/clickhouse v1.3.4-0.20181106115746-3e9a6b9beb12
go.uber.org/zap v1.9.1
go.etcd.io/etcd v0.5.0-alpha.5.0.20190320044326-77d4b742cdbf
go.uber.org/multierr v1.4.0 // indirect
go.uber.org/zap v1.12.0
golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf // indirect
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20190909082730-f460065e899a
golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect
golang.org/x/tools v0.0.0-20191107010934-f79515f33823 // indirect
google.golang.org/appengine v1.4.0 // indirect
google.golang.org/grpc v1.17.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
)

Expand Down
Loading

0 comments on commit a0679c5

Please sign in to comment.