-
Notifications
You must be signed in to change notification settings - Fork 1
/
doc.go
86 lines (71 loc) · 2.27 KB
/
doc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*
Package binlog 将自己伪装成slave获取mysql主从复杂流来
获取mysql数据库的数据变更,提供轻量级,快速的dump协议交互
以及binlog的row模式下的格式解析。使用方式较为简单,首先你
要实现一个MysqlTableMapper
type mysqlColumnAttribute struct {
field string
typ string
}
func (m *mysqlColumnAttribute) Field() string {
return m.field
}
func (m *mysqlColumnAttribute) IsUnSignedInt() bool {
return strings.Contains(m.typ, mysqlUnsigned)
}
type mysqlTableInfo struct {
name MysqlTableName
columns []MysqlColumn
}
func (m *mysqlTableInfo) Name() MysqlTableName {
return m.name
}
func (m *mysqlTableInfo) Columns() []MysqlColumn {
return m.columns
}
type exampleMysqlTableMapper struct {
db *sql.DB
}
func (e *exampleMysqlTableMapper) MysqlTable(name MysqlTableName) (MysqlTable, error) {
info := &mysqlTableInfo{
name: name,
columns: make([]MysqlColumn, 0, 10),
}
query := "desc " + name.String()
rows, err := e.db.Query(query)
if err != nil {
return info, fmt.Errorf("query failed query: %s, error: %v", query, err)
}
defer rows.Close()
var null,key,extra string
var columnDefault []byte
for i := 0; rows.Next(); i++ {
column := &mysqlColumnAttribute{}
err = rows.Scan(&column.field, &column.typ, &null, &key, &columnDefault, &extra)
if err != nil {
return info, err
}
info.columns = append(info.columns, column)
}
return info, nil
}
再申请一个NewRowStreamer,数据库连接信息为user:password@tcp(ip:port)/db
user是mysql的用户名,password是mysql的密码,ip是mysql的ip地址,
port是mysql的端口,db是mysql的数据库名,serverID要与主库不同,
SetStartBinlogPosition的参数可以通过SHOW MASTER STATUS获取
dsn := "example:example@tcp(localhost:3306)/mysql"
r, err := NewRowStreamer(dsn, 1234, e)
if err != nil {
fmt.Printf("NewRowStreamer fail. err: %v", err)
return
}
r.SetStartBinlogPosition(pos)
然后开启Stream,可以在SendTransactionFun用于处理事务信息函数,如打印事务信息
ctx := context.Background()
err = r.Stream(ctx, func(t *Transaction) error {
fmt.Printf("%v", *t)
return nil
})
最后可以通过ctx的cancal结束本binlog流的同步
*/
package binlog