Skip to content

Commit

Permalink
Merge branch 'master' into 'fix-transfer-raft-leader-while-loading'
Browse files Browse the repository at this point in the history
# Conflicts:
#   node/node.go
#   wal/wal_test.go
  • Loading branch information
absolute8511 committed Jun 18, 2021
2 parents 1181268 + c38ab46 commit 1847e3c
Show file tree
Hide file tree
Showing 71 changed files with 2,131 additions and 514 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:

- name: Get dependencies
run: |
sudo apt-get install libsnappy1v5 libsnappy-dev libjemalloc1 libjemalloc-dev
sudo apt-get install libsnappy1v5 libsnappy-dev libjemalloc2 libjemalloc-dev
git clone https://github.com/absolute8511/rocksdb.git /tmp/rocksdb
pushd /tmp/rocksdb && git checkout v6.4.6-patched && PORTABLE=1 WITH_JEMALLOC_FLAG=1 JEMALLOC=1 make static_lib && popd
Expand All @@ -39,4 +39,4 @@ jobs:
ROCKSDB=/tmp/rocksdb ./test.sh
- name: Codecov
uses: codecov/codecov-action@v1.0.7
uses: codecov/codecov-action@v1.0.7
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ addons:
- g++-4.9
install:
- export CXX="g++-4.9" CC="gcc-4.9"
- sudo apt-get install libsnappy1v5 libsnappy-dev libjemalloc1 libjemalloc-dev
- sudo apt-get install libsnappy1v5 libsnappy-dev libjemalloc2 libjemalloc-dev
- git clone https://github.com/absolute8511/rocksdb.git /tmp/rocksdb
- pushd /tmp/rocksdb && git checkout v6.4.6-patched && PORTABLE=1 WITH_JEMALLOC_FLAG=1 JEMALLOC=1 make static_lib && popd
script:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ PREFIX=/usr/local
DESTDIR=
BINDIR=${PREFIX}/bin
PROJECT?=github.com/youzan/ZanRedisDB
VERBINARY?= 0.9.2
VERBINARY?= 0.9.3
COMMIT?=$(shell git rev-parse --short HEAD)
BUILD_TIME?=$(shell date '+%Y-%m-%d_%H:%M:%S-%Z')
GOFLAGS=-ldflags "-X ${PROJECT}/common.VerBinary=${VERBINARY} -X ${PROJECT}/common.Commit=${COMMIT} -X ${PROJECT}/common.BuildTime=${BUILD_TIME}"
Expand Down
234 changes: 137 additions & 97 deletions apps/backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,36 +205,44 @@ func hbackup(ch chan interface{}, file *os.File, client *sdk.ZanRedisClient) {
tp := []byte{1}
backupCommon(tp, ch, file, func(key []byte, item []interface{}, file *os.File) error {
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(item)))
n, err := file.Write(lenBuf)
if *readable {
file.WriteString("\n")
} else {
binary.BigEndian.PutUint32(lenBuf, uint32(len(item)))
n, err := file.Write(lenBuf)

if err != nil {
fmt.Printf("write field-value count error.[ns=%s, table=%s, key=%s, err=%v]\n", *ns, *table, string(key), err)
return err
}
if err != nil {
fmt.Printf("write field-value count error.[ns=%s, table=%s, key=%s, err=%v]\n", *ns, *table, string(key), err)
return err
}

if n != 4 {
fmt.Printf("write field-value count length error. [ns=%s, table=%s, key=%s, count=%d, len=%d]\n",
*ns, *table, string(key), len(item), n)
return errWriteLen
if n != 4 {
fmt.Printf("write field-value count length error. [ns=%s, table=%s, key=%s, count=%d, len=%d]\n",
*ns, *table, string(key), len(item), n)
return errWriteLen
}
}

for i := 0; i < len(item); i++ {
fv := item[i].([]interface{})
field := fv[0].([]byte)
fieldLen := len(field)
binary.BigEndian.PutUint32(lenBuf, uint32(fieldLen))
n, err = file.Write(lenBuf)
if err != nil {
fmt.Printf("write field's len error. [ns=%s, table=%s, key=%s, field=%v, err=%v]\n", *ns, *table, string(key), field, err)
return err
}
if n != 4 {
fmt.Printf("write field's len length error. [ns=%s, table=%s, key=%s, field=%v, len=%d]\n", *ns, *table, string(key), field, n)
return errWriteLen
if *readable {
file.WriteString("\n")
} else {
binary.BigEndian.PutUint32(lenBuf, uint32(fieldLen))
n, err := file.Write(lenBuf)
if err != nil {
fmt.Printf("write field's len error. [ns=%s, table=%s, key=%s, field=%v, err=%v]\n", *ns, *table, string(key), field, err)
return err
}
if n != 4 {
fmt.Printf("write field's len length error. [ns=%s, table=%s, key=%s, field=%v, len=%d]\n", *ns, *table, string(key), field, n)
return errWriteLen
}
}

n, err = file.Write(field)
n, err := file.Write(field)
if err != nil {
fmt.Printf("write field error. [ns=%s, table=%s, key=%s, field=%v, err=%v]\n", *ns, *table, string(key), field, err)
return err
Expand All @@ -246,15 +254,19 @@ func hbackup(ch chan interface{}, file *os.File, client *sdk.ZanRedisClient) {

value := fv[1].([]byte)
valLen := len(value)
binary.BigEndian.PutUint32(lenBuf, uint32(valLen))
n, err = file.Write(lenBuf)
if err != nil {
fmt.Printf("write val's len error. [ns=%s, table=%s, key=%s, val=%v, err=%v]\n", *ns, *table, string(key), value, err)
return err
}
if n != 4 {
fmt.Printf("write val's len length error. [ns=%s, table=%s, key=%s, val=%v, len=%d]\n", *ns, *table, string(key), value, n)
return errWriteLen
if *readable {
file.WriteString("\n")
} else {
binary.BigEndian.PutUint32(lenBuf, uint32(valLen))
n, err = file.Write(lenBuf)
if err != nil {
fmt.Printf("write val's len error. [ns=%s, table=%s, key=%s, val=%v, err=%v]\n", *ns, *table, string(key), value, err)
return err
}
if n != 4 {
fmt.Printf("write val's len length error. [ns=%s, table=%s, key=%s, val=%v, len=%d]\n", *ns, *table, string(key), value, n)
return errWriteLen
}
}

n, err = file.Write(value)
Expand All @@ -277,35 +289,43 @@ func lbackup(ch chan interface{}, file *os.File, client *sdk.ZanRedisClient) {
tp := []byte{2}
backupCommon(tp, ch, file, func(key []byte, item []interface{}, file *os.File) error {
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(item)))
n, err := file.Write(lenBuf)
if *readable {
file.WriteString("\n")
} else {
binary.BigEndian.PutUint32(lenBuf, uint32(len(item)))
n, err := file.Write(lenBuf)

if err != nil {
fmt.Printf("write list count error.[ns=%s, table=%s, key=%s, err=%v]\n", *ns, *table, string(key), err)
return err
}
if err != nil {
fmt.Printf("write list count error.[ns=%s, table=%s, key=%s, err=%v]\n", *ns, *table, string(key), err)
return err
}

if n != 4 {
fmt.Printf("write list count length error. [ns=%s, table=%s, key=%s, count=%d, len=%d]\n",
*ns, *table, string(key), len(item), n)
return errWriteLen
if n != 4 {
fmt.Printf("write list count length error. [ns=%s, table=%s, key=%s, count=%d, len=%d]\n",
*ns, *table, string(key), len(item), n)
return errWriteLen
}
}

for i := 0; i < len(item); i++ {
value := item[i].([]byte)
valLen := len(value)
binary.BigEndian.PutUint32(lenBuf, uint32(valLen))
n, err = file.Write(lenBuf)
if err != nil {
fmt.Printf("write val's len error. [ns=%s, table=%s, key=%s, val=%v, err=%v]\n", *ns, *table, string(key), value, err)
return err
}
if n != 4 {
fmt.Printf("write val's len length error. [ns=%s, table=%s, key=%s, val=%v, len=%d]\n", *ns, *table, string(key), value, n)
return errWriteLen
if *readable {
file.WriteString("\n")
} else {
binary.BigEndian.PutUint32(lenBuf, uint32(valLen))
n, err := file.Write(lenBuf)
if err != nil {
fmt.Printf("write val's len error. [ns=%s, table=%s, key=%s, val=%v, err=%v]\n", *ns, *table, string(key), value, err)
return err
}
if n != 4 {
fmt.Printf("write val's len length error. [ns=%s, table=%s, key=%s, val=%v, len=%d]\n", *ns, *table, string(key), value, n)
return errWriteLen
}
}

n, err = file.Write(value)
n, err := file.Write(value)
if err != nil {
fmt.Printf("write val error. [ns=%s, table=%s, key=%s, val=%v, err=%v]\n", *ns, *table, string(key), value, err)
return err
Expand All @@ -325,34 +345,42 @@ func sbackup(ch chan interface{}, file *os.File, client *sdk.ZanRedisClient) {
tp := []byte{3}
backupCommon(tp, ch, file, func(key []byte, item []interface{}, file *os.File) error {
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(item)))
n, err := file.Write(lenBuf)
if *readable {
file.WriteString("\n")
} else {
binary.BigEndian.PutUint32(lenBuf, uint32(len(item)))
n, err := file.Write(lenBuf)

if err != nil {
fmt.Printf("write member count error.[ns=%s, table=%s, key=%s, err=%v]\n", *ns, *table, string(key), err)
return err
}
if err != nil {
fmt.Printf("write member count error.[ns=%s, table=%s, key=%s, err=%v]\n", *ns, *table, string(key), err)
return err
}

if n != 4 {
fmt.Printf("write member count length error. [ns=%s, table=%s, key=%s, count=%d, len=%d]\n", *ns, *table, string(key), len(item), n)
return errWriteLen
if n != 4 {
fmt.Printf("write member count length error. [ns=%s, table=%s, key=%s, count=%d, len=%d]\n", *ns, *table, string(key), len(item), n)
return errWriteLen
}
}

for i := 0; i < len(item); i++ {
member := item[i].([]byte)
memberLen := len(member)
binary.BigEndian.PutUint32(lenBuf, uint32(memberLen))
n, err = file.Write(lenBuf)
if err != nil {
fmt.Printf("write member's len error. [ns=%s, table=%s, key=%s, member=%v, err=%v]\n", *ns, *table, string(key), member, err)
return err
}
if n != 4 {
fmt.Printf("write member's len length error. [ns=%s, table=%s, key=%s, member=%v, len=%d]\n", *ns, *table, string(key), member, n)
return errWriteLen
if *readable {
file.WriteString("\n")
} else {
binary.BigEndian.PutUint32(lenBuf, uint32(memberLen))
n, err := file.Write(lenBuf)
if err != nil {
fmt.Printf("write member's len error. [ns=%s, table=%s, key=%s, member=%v, err=%v]\n", *ns, *table, string(key), member, err)
return err
}
if n != 4 {
fmt.Printf("write member's len length error. [ns=%s, table=%s, key=%s, member=%v, len=%d]\n", *ns, *table, string(key), member, n)
return errWriteLen
}
}

n, err = file.Write(member)
n, err := file.Write(member)
if err != nil {
fmt.Printf("write member error. [ns=%s, table=%s, key=%s, member=%v, err=%v]\n", *ns, *table, string(key), member, err)
return err
Expand All @@ -372,36 +400,44 @@ func zbackup(ch chan interface{}, file *os.File, client *sdk.ZanRedisClient) {
tp := []byte{4}
backupCommon(tp, ch, file, func(key []byte, item []interface{}, file *os.File) error {
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(item)))
n, err := file.Write(lenBuf)
if *readable {
file.WriteString("\n")
} else {
binary.BigEndian.PutUint32(lenBuf, uint32(len(item)))
n, err := file.Write(lenBuf)

if err != nil {
fmt.Printf("write member-score count error.[ns=%s, table=%s, key=%s, err=%v]\n", *ns, *table, string(key), err)
return err
}
if err != nil {
fmt.Printf("write member-score count error.[ns=%s, table=%s, key=%s, err=%v]\n", *ns, *table, string(key), err)
return err
}

if n != 4 {
fmt.Printf("write member-score count length error. [ns=%s, table=%s, key=%s, count=%d, len=%d]\n",
*ns, *table, string(key), len(item), n)
return errWriteLen
if n != 4 {
fmt.Printf("write member-score count length error. [ns=%s, table=%s, key=%s, count=%d, len=%d]\n",
*ns, *table, string(key), len(item), n)
return errWriteLen
}
}

for i := 0; i < len(item); i++ {
ms := item[i].([]interface{})
member := ms[0].([]byte)
memberLen := len(member)
binary.BigEndian.PutUint32(lenBuf, uint32(memberLen))
n, err = file.Write(lenBuf)
if err != nil {
fmt.Printf("write member's len error. [ns=%s, table=%s, key=%s, member=%v, err=%v]\n", *ns, *table, string(key), member, err)
return err
}
if n != 4 {
fmt.Printf("write member's len length error. [ns=%s, table=%s, key=%s, member=%v, len=%d]\n", *ns, *table, string(key), member, n)
return errWriteLen
if *readable {
file.WriteString("\n")
} else {
binary.BigEndian.PutUint32(lenBuf, uint32(memberLen))
n, err := file.Write(lenBuf)
if err != nil {
fmt.Printf("write member's len error. [ns=%s, table=%s, key=%s, member=%v, err=%v]\n", *ns, *table, string(key), member, err)
return err
}
if n != 4 {
fmt.Printf("write member's len length error. [ns=%s, table=%s, key=%s, member=%v, len=%d]\n", *ns, *table, string(key), member, n)
return errWriteLen
}
}

n, err = file.Write(member)
n, err := file.Write(member)
if err != nil {
fmt.Printf("write member error. [ns=%s, table=%s, key=%s, member=%v, err=%v]\n", *ns, *table, string(key), member, err)
return err
Expand All @@ -413,15 +449,19 @@ func zbackup(ch chan interface{}, file *os.File, client *sdk.ZanRedisClient) {

score := ms[1].([]byte)
scoreLen := len(score)
binary.BigEndian.PutUint32(lenBuf, uint32(scoreLen))
n, err = file.Write(lenBuf)
if err != nil {
fmt.Printf("write score's len error. [ns=%s, table=%s, key=%s, score=%v, err=%v]\n", *ns, *table, string(key), score, err)
return err
}
if n != 4 {
fmt.Printf("write score's len length error. [ns=%s, table=%s, key=%s, score=%v, len=%d]\n", *ns, *table, string(key), score, n)
return errWriteLen
if *readable {
file.WriteString("\n")
} else {
binary.BigEndian.PutUint32(lenBuf, uint32(scoreLen))
n, err = file.Write(lenBuf)
if err != nil {
fmt.Printf("write score's len error. [ns=%s, table=%s, key=%s, score=%v, err=%v]\n", *ns, *table, string(key), score, err)
return err
}
if n != 4 {
fmt.Printf("write score's len length error. [ns=%s, table=%s, key=%s, score=%v, len=%d]\n", *ns, *table, string(key), score, n)
return errWriteLen
}
}

n, err = file.Write(score)
Expand Down
9 changes: 4 additions & 5 deletions apps/placedriver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"fmt"
"log"
"os"
"path"
"path/filepath"
"syscall"

"github.com/youzan/ZanRedisDB/common"
"github.com/youzan/ZanRedisDB/pdserver"

"github.com/BurntSushi/toml"
"github.com/absolute8511/glog"
"github.com/judwhite/go-svc/svc"
"github.com/mreiferson/go-options"
)
Expand All @@ -21,6 +21,7 @@ var (
flagSet = flag.NewFlagSet("placedriver", flag.ExitOnError)

config = flagSet.String("config", "", "path to config file")
logAge = flagSet.Int("logage", 0, "the max age (day) log will keep")
showVersion = flagSet.Bool("version", false, "print version string")

httpAddress = flagSet.String("http-address", "0.0.0.0:18001", "<addr>:<port> to listen on for HTTP clients")
Expand Down Expand Up @@ -52,7 +53,7 @@ type program struct {
}

func main() {
defer glog.Flush()
defer common.FlushZapDefault()
prg := &program{}
if err := svc.Run(prg, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGINT); err != nil {
log.Fatal(err)
Expand All @@ -68,8 +69,6 @@ func (p *program) Init(env svc.Environment) error {
}

func (p *program) Start() error {
glog.InitWithFlag(flagSet)

flagSet.Parse(os.Args[1:])
fmt.Println(common.VerString("placedriver"))
if *showVersion {
Expand All @@ -86,7 +85,7 @@ func (p *program) Start() error {

opts := pdserver.NewServerConfig()
options.Resolve(opts, flagSet, cfg)
common.InitDefaultForGLogger(opts.LogDir)
common.SetZapRotateOptions(false, true, path.Join(opts.LogDir, "placedriver.log"), 0, 0, *logAge)
daemon, err := pdserver.NewServer(opts)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 1847e3c

Please sign in to comment.