-
Notifications
You must be signed in to change notification settings - Fork 1
/
example_test.go
168 lines (143 loc) · 3.69 KB
/
example_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package binlog
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"os/signal"
"strings"
//_ "github.com/go-sql-driver/mysql" you need it in you own project
)
const (
mysqlUnsigned = "unsigned" //无符号
)
//列属性
type mysqlColumnAttribute struct {
field string //列名
typ string //列类型
null string //是否为空
key string //PRI代表主键,UNI代表唯一索引
columnDefault []byte //默认值
extra string //其他备注信息
}
func (m *mysqlColumnAttribute) Field() string {
return m.field
}
func (m *mysqlColumnAttribute) IsUnSignedInt() bool {
return strings.Contains(m.typ, mysqlUnsigned)
}
type mysqlTableInfo struct {
name MysqlTableName
columns []MysqlColumn
}
func (m *mysqlTableInfo) Name() MysqlTableName {
return m.name
}
func (m *mysqlTableInfo) Columns() []MysqlColumn {
return m.columns
}
type exampleMysqlTableMapper struct {
db *sql.DB
}
func (e *exampleMysqlTableMapper) GetBinlogFormat() (format FormatType, err error) {
query := "SHOW VARIABLES LIKE 'binlog_format'"
var name, str string
err = e.db.QueryRow(query).Scan(&name, &str)
if err != nil {
err = fmt.Errorf("QueryRow fail. query: %s, error: %v", query, err)
return
}
format = FormatType(str)
return
}
func (e *exampleMysqlTableMapper) GetBinlogPosition() (pos Position, err error) {
query := "SHOW MASTER STATUS"
var metaDoDb, metaIgnoreDb, executedGTidSet string
err = e.db.QueryRow(query).Scan(&pos.Filename, &pos.Offset, &metaDoDb, &metaIgnoreDb, &executedGTidSet)
if err != nil {
err = fmt.Errorf("query fail. query: %s, error: %v", query, err)
return
}
return
}
func (e *exampleMysqlTableMapper) MysqlTable(name MysqlTableName) (MysqlTable, error) {
info := &mysqlTableInfo{
name: name,
columns: make([]MysqlColumn, 0, 10),
}
query := "desc " + name.String()
rows, err := e.db.Query(query)
if err != nil {
return info, fmt.Errorf("query failed query: %s, error: %v", query, err)
}
defer rows.Close()
for i := 0; rows.Next(); i++ {
column := &mysqlColumnAttribute{}
err = rows.Scan(&column.field, &column.typ, &column.null, &column.key, &column.columnDefault, &column.extra)
if err != nil {
return info, err
}
info.columns = append(info.columns, column)
}
return info, nil
}
func showTransaction(t *Transaction) {
b, err := t.MarshalJSON()
if err != nil {
lw.logger().Errorf("MarshalJSON fail. err: %v", err)
return
}
lw.logger().Print("%v", string(b))
}
func ExampleRowStreamer_Stream() {
SetLogger(NewDefaultLogger(os.Stdout, DebugLevel))
dsn := "example:example@tcp(localhost:3306)/mysql?charset=utf8mb4"
db, err := sql.Open("mysql", dsn)
if err != nil {
lw.logger().Errorf("open fail. err: %v", err)
return
}
defer db.Close()
db.SetMaxIdleConns(2)
db.SetMaxOpenConns(4)
e := &exampleMysqlTableMapper{db: db}
format, err := e.GetBinlogFormat()
if err != nil {
lw.logger().Errorf("getBinlogFormat fail. err: %v", err)
return
}
if !format.IsRow() {
lw.logger().Errorf("binlog format is not row. format: %v", format)
return
}
pos, err := e.GetBinlogPosition()
if err != nil {
lw.logger().Errorf("GetBinlogPosition fail. err: %v", err)
return
}
r, err := NewRowStreamer(dsn, 1234, e)
if err != nil {
lw.logger().Errorf("NewRowStreamer fail. err: %v", err)
return
}
r.SetStartBinlogPosition(pos)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
processWait := make(chan os.Signal, 1)
signal.Notify(processWait, os.Kill, os.Interrupt)
go func() {
select {
case <-processWait:
cancel()
}
}()
err = r.Stream(ctx, func(t *Transaction) error {
showTransaction(t)
return nil
})
if err != nil {
log.Fatalf("Stream fail. err: %v", err)
return
}
}