Skip to content

Commit

Permalink
pump client: add compatible with kafka version && refine retry && add…
Browse files Browse the repository at this point in the history
… strategy config (#223)

* pump client: compatible with kafka version tidb-binlog && add unit test (#139)

* pump client: write commit binlog will never return error (#148)

* pkg watcher: move watcher from tidb-enterprise-tools (#146)

* pump client: increase retry time, and refine some code (#158)

* pump client: add initial log function (#165)

* pump client: support change select pump's strategy (#221)
  • Loading branch information
WangXiangUSTC authored and IANTHEREAL committed Mar 29, 2019
1 parent 419f308 commit 64d87c4
Show file tree
Hide file tree
Showing 14 changed files with 1,287 additions and 292 deletions.
2 changes: 1 addition & 1 deletion checker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"os"

_ "github.com/go-sql-driver/mysql"
"github.com/ngaut/log"
"github.com/pingcap/tidb-tools/pkg/check"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/utils"
log "github.com/sirupsen/logrus"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion dump_region/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
Expand All @@ -25,7 +26,6 @@ import (
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"golang.org/x/net/context"
)

var (
Expand Down
21 changes: 12 additions & 9 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
package etcd

import (
"context"
"crypto/tls"
"path"
"strings"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/pingcap/errors"
"golang.org/x/net/context"
)

// Node organizes the ectd query result as a Trie tree
Expand Down Expand Up @@ -89,18 +89,18 @@ func (e *Client) Create(ctx context.Context, key string, val string, opts []clie
}

// Get returns a key/value matchs the given key
func (e *Client) Get(ctx context.Context, key string) ([]byte, error) {
func (e *Client) Get(ctx context.Context, key string) (value []byte, revision int64, err error) {
key = keyWithPrefix(e.rootPath, key)
resp, err := e.client.KV.Get(ctx, key)
if err != nil {
return nil, errors.Trace(err)
return nil, -1, errors.Trace(err)
}

if len(resp.Kvs) == 0 {
return nil, errors.NotFoundf("key %s in etcd", key)
return nil, -1, errors.NotFoundf("key %s in etcd", key)
}

return resp.Kvs[0].Value, nil
return resp.Kvs[0].Value, resp.Header.Revision, nil
}

// Update updates a key/value.
Expand Down Expand Up @@ -156,15 +156,15 @@ func (e *Client) UpdateOrCreate(ctx context.Context, key string, val string, ttl
}

// List returns the trie struct that constructed by the key/value with same prefix
func (e *Client) List(ctx context.Context, key string) (*Node, error) {
func (e *Client) List(ctx context.Context, key string) (node *Node, revision int64, err error) {
key = keyWithPrefix(e.rootPath, key)
if !strings.HasSuffix(key, "/") {
key += "/"
}

resp, err := e.client.KV.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
return nil, errors.Trace(err)
return nil, -1, errors.Trace(err)
}

root := new(Node)
Expand All @@ -180,7 +180,7 @@ func (e *Client) List(ctx context.Context, key string) (*Node, error) {
tailNode.Value = kv.Value
}

return root, nil
return root, resp.Header.Revision, nil
}

// Delete deletes the key/values with matching prefix or key
Expand All @@ -200,7 +200,10 @@ func (e *Client) Delete(ctx context.Context, key string, withPrefix bool) error
}

// Watch watchs the events of key with prefix.
func (e *Client) Watch(ctx context.Context, prefix string) clientv3.WatchChan {
func (e *Client) Watch(ctx context.Context, prefix string, revision int64) clientv3.WatchChan {
if revision > 0 {
return e.client.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
}
return e.client.Watch(ctx, prefix, clientv3.WithPrefix())
}

Expand Down
28 changes: 20 additions & 8 deletions pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
package etcd

import (
"context"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/pingcap/check"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"golang.org/x/net/context"
)

var (
Expand Down Expand Up @@ -71,7 +72,7 @@ func (t *testEtcdSuite) TestCreateWithTTL(c *C) {
c.Assert(err, IsNil)

time.Sleep(2 * time.Second)
_, err = etcdCli.Get(ctx, key)
_, _, err = etcdCli.Get(ctx, key)
c.Assert(errors.IsNotFound(err), IsTrue)
}

Expand Down Expand Up @@ -99,19 +100,25 @@ func (t *testEtcdSuite) TestUpdate(c *C) {
err = etcdCli.Create(ctx, key, obj1, opts)
c.Assert(err, IsNil)

res, revision1, err := etcdCli.Get(ctx, key)
c.Assert(err, IsNil)
c.Assert(string(res), Equals, obj1)

time.Sleep(time.Second)

err = etcdCli.Update(ctx, key, obj2, 3)
c.Assert(err, IsNil)

time.Sleep(2 * time.Second)

res, err := etcdCli.Get(ctx, key)
// the new revision should greater than the old
res, revision2, err := etcdCli.Get(ctx, key)
c.Assert(err, IsNil)
c.Assert(string(res), Equals, obj2)
c.Assert(revision2, check.Greater, revision1)

time.Sleep(2 * time.Second)
res, err = etcdCli.Get(ctx, key)
res, _, err = etcdCli.Get(ctx, key)
c.Assert(errors.IsNotFound(err), IsTrue)
}

Expand Down Expand Up @@ -142,12 +149,17 @@ func (t *testEtcdSuite) TestList(c *C) {
err = etcdCli.Create(ctx, k11, k11, nil)
c.Assert(err, IsNil)

root, err := etcdCli.List(ctx, key)
root, revision1, err := etcdCli.List(ctx, key)
c.Assert(err, IsNil)
c.Assert(string(root.Childs["level1"].Value), Equals, k1)
c.Assert(string(root.Childs["level1"].Childs["level1"].Value), Equals, k11)
c.Assert(string(root.Childs["level2"].Value), Equals, k2)
c.Assert(string(root.Childs["level3"].Value), Equals, k3)

// the revision of list should equal to the latest update's revision
_, revision2, err := etcdCli.Get(ctx, k11)
c.Assert(err, IsNil)
c.Assert(revision1, Equals, revision2)
}

func (t *testEtcdSuite) TestDelete(c *C) {
Expand All @@ -158,21 +170,21 @@ func (t *testEtcdSuite) TestDelete(c *C) {
c.Assert(err, IsNil)
}

root, err := etcdCli.List(ctx, key)
root, _, err := etcdCli.List(ctx, key)
c.Assert(err, IsNil)
c.Assert(root.Childs, HasLen, 2)

err = etcdCli.Delete(ctx, keys[1], false)
c.Assert(err, IsNil)

root, err = etcdCli.List(ctx, key)
root, _, err = etcdCli.List(ctx, key)
c.Assert(err, IsNil)
c.Assert(root.Childs, HasLen, 1)

err = etcdCli.Delete(ctx, key, true)
c.Assert(err, IsNil)

root, err = etcdCli.List(ctx, key)
root, _, err = etcdCli.List(ctx, key)
c.Assert(err, IsNil)
c.Assert(root.Childs, HasLen, 0)
}
Expand Down
75 changes: 75 additions & 0 deletions pkg/watcher/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package watcher

import (
"bytes"
"os"
)

// Op represents file operation type
type Op uint32

// Operations type current supported
const (
Create Op = 1 << iota
Remove
Modify
Rename
Chmod
Move
)

func (op Op) String() string {
var buffer bytes.Buffer

// now, only one Op will used in polling, but it can combine multi Ops if needed
if op&Create == Create {
buffer.WriteString("|CREATE")
}
if op&Remove == Remove {
buffer.WriteString("|REMOVE")
}
if op&Modify == Modify {
buffer.WriteString("|MODIFY")
}
if op&Rename == Rename {
buffer.WriteString("|RENAME")
}
if op&Chmod == Chmod {
buffer.WriteString("|CHMOD")
}
if op&Move == Move {
buffer.WriteString("|MOVE")
}
if buffer.Len() == 0 {
return ""
}
return buffer.String()[1:] // Strip leading pipe
}

// Event represents a single file operation event
type Event struct {
Path string
Op Op
FileInfo os.FileInfo
}

// IsDirEvent returns whether is a event for a directory
func (e *Event) IsDirEvent() bool {
if e == nil {
return false
}
return e.FileInfo.IsDir()
}

// HasOps checks whether has any specified operation types
func (e *Event) HasOps(ops ...Op) bool {
if e == nil {
return false
}
for _, op := range ops {
if e.Op&op != 0 {
return true
}
}
return false
}
Loading

0 comments on commit 64d87c4

Please sign in to comment.