Skip to content

Commit

Permalink
use DSN as checkpoint zpath
Browse files Browse the repository at this point in the history
  • Loading branch information
funkygao committed Apr 17, 2017
1 parent d90e6f7 commit 70d38bb
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ _testmain.go
hh/
.target/
var/
engine.png

cmd/dbusd/dbusd
pkg/kafka/kpub/kpub
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ dbus uses epoch to solve this issue.

### TODO

- [ ] when Input stops, Output might still need its OnAck
- [ ] resource group
- [ ] kafka producer qos
- [ ] myslave should have no checkpoint, placed in Input
Expand Down
2 changes: 2 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ func (e *Engine) ServeForever() (ret error) {
close(c)
}

// needn't close registered zkzones, they will not raise leakage

if telemetry.Default != nil {
telemetry.Default.Stop()
}
Expand Down
3 changes: 2 additions & 1 deletion etc/canal_many.cf
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@

{
name: "out.mock"
class: "MockOutput"
class: "KafkaOutput"
metrics: false
dsn: "kafka:local://me/foobar"
match: ["in.test", "in.mysql", ]
}
]
Expand Down
5 changes: 0 additions & 5 deletions pkg/checkpoint/store/zk/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package zk

import (
"path"
"strings"
"time"

"github.com/funkygao/dbus/pkg/checkpoint"
Expand All @@ -26,10 +25,6 @@ type checkpointZK struct {
}

func New(zkzone *zk.ZkZone, state checkpoint.State, zroot string, zpath string, interval time.Duration) checkpoint.Checkpoint {
if strings.Contains(zpath, "/") {
panic("zpath illegal")
}

if len(zroot) > 0 {
root = zroot
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/checkpoint/store/zk/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func init() {

func TestRealPath(t *testing.T) {
s := binlog.New("", "")
assert.Equal(t, "/dbus/checkpoint/myslave/12.12.1.2:3334", realPath(s, "12.12.1.2:3334"))
assert.Equal(t, "/dbus/checkpoint/myslave/12.12.1.2%3A3334", realPath(s, "12.12.1.2:3334"))
}

func TestCheckpointZKBinlog(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/checkpoint/store/zk/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (m *manager) AllStates() ([]checkpoint.State, error) {
// FIXME ugly design
switch scheme {
case "myslave":
dsn, err = decodeDSN(dsn)
if err != nil {
return nil, err
}
s := binlog.New(dsn, "") // empty name is ok, wait for Unmarshal
s.Unmarshal(data)
r = append(r, s)
Expand Down
7 changes: 6 additions & 1 deletion pkg/checkpoint/store/zk/path.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package zk

import (
"net/url"
"path"

"github.com/funkygao/dbus/pkg/checkpoint"
Expand All @@ -9,5 +10,9 @@ import (
var root = "/dbus/checkpoint"

func realPath(state checkpoint.State, zpath string) string {
return path.Join(root, state.Scheme(), zpath)
return path.Join(root, state.Scheme(), url.QueryEscape(zpath))
}

func decodeDSN(dsn string) (string, error) {
return url.QueryUnescape(dsn)
}
4 changes: 2 additions & 2 deletions pkg/myslave/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func (m *MySlave) handleRowsEvent(f string, h *replication.EventHeader, e *repli
schema := string(e.Table.Schema)
table := string(e.Table.Table)
if !m.Predicate(schema, table) {
log.Debug("[%s] ignored[%s.%s]: %+v %+v", m.masterAddr, schema, table, h, e)
log.Debug("[%s] ignored[%s.%s]: %+v %+v", m.dsn, schema, table, h, e)
m.commitPosition(f, h.LogPos) // FIXME batcher partial failure?
return
}
Expand All @@ -27,7 +27,7 @@ func (m *MySlave) handleRowsEvent(f string, h *replication.EventHeader, e *repli
action = "U"

default:
log.Warn("[%s] %s not supported: %+v", m.masterAddr, h.EventType, e)
log.Warn("[%s] %s not supported: %+v", m.dsn, h.EventType, e)
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/myslave/myslave.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (m *MySlave) LoadConfig(config *conf.Conf) *MySlave {

m.m = newMetrics(m.name)
m.p = czk.New(engine.Globals().GetOrRegisterZkzone(zone), m.state, m.zrootCheckpoint,
m.masterAddr, m.c.Duration("pos_commit_interval", time.Second))
m.dsn, m.c.Duration("pos_commit_interval", time.Second))

return m
}
Expand Down

0 comments on commit 70d38bb

Please sign in to comment.