Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/loader: add pkg to load data to mysql #436

Merged
merged 28 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
32a119f
pkg/loader: add pkg to load data to mysql
july2993 Jan 7, 2019
bc7d1a6
vendor/* update vendor add errgroup
july2993 Jan 7, 2019
c12019c
Merge branch 'master' into hjh/loader
july2993 Jan 7, 2019
9c9fd60
add a update pk&uk case and add comment about mergeByKey
july2993 Jan 8, 2019
296f878
Merge remote-tracking branch 'origin/hjh/loader' into hjh/loader
july2993 Jan 8, 2019
45d68fd
executor.go: Fix forgot to return err and quote column name
july2993 Jan 8, 2019
f85ec8b
bench_test.go set merge or not and add delete&update bench
july2993 Jan 10, 2019
d4f5cf7
loader/* don't always chagne insert -> replace refine some code
july2993 Jan 10, 2019
07cb6c9
Merge branch 'master' into hjh/loader
july2993 Jan 10, 2019
b229f43
add example and refine code
july2993 Jan 13, 2019
b9eb9ce
add metrics of loader, change NewLoader api
july2993 Jan 14, 2019
03d05d1
Merge remote-tracking branch 'origin/hjh/loader' into hjh/loader
july2993 Jan 14, 2019
5f5873b
load.go: merge pk and batch if and only if have pk and no uk
july2993 Jan 14, 2019
a06a5f4
load.go: fix not add item when DetectConflict
july2993 Jan 14, 2019
9b506fc
loader:* remove useless code and use utf8mb4
july2993 Jan 14, 2019
8684e66
load.go: Simplify some log
july2993 Jan 15, 2019
65232ad
add README.md
july2993 Jan 15, 2019
9c0bb48
Update pkg/loader/example_loader_test.go
kennytm Jan 16, 2019
83ba23c
Update pkg/loader/README.md
kennytm Jan 16, 2019
e11c7ac
Update pkg/loader/README.md
kennytm Jan 16, 2019
38850e1
Update pkg/loader/README.md
kennytm Jan 16, 2019
84c637a
Update pkg/loader/merge_test.go
kennytm Jan 16, 2019
81fb7b1
Update pkg/loader/load.go
kennytm Jan 16, 2019
b36ff7c
Update pkg/loader/README.md
kennytm Jan 16, 2019
54aa4d8
Update pkg/loader/README.md
kennytm Jan 16, 2019
c23895f
loader: address comments and use strings.Builder
july2993 Jan 16, 2019
22076e0
merge.go factor some common case out
july2993 Jan 16, 2019
ff844d0
executor.go: Use fmt.Fprintf write data to builder
july2993 Jan 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
"github.com/pingcap/tidb-binlog/drainer/executor"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/loader"
pkgsql "github.com/pingcap/tidb-binlog/pkg/sql"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -55,7 +56,7 @@ type Syncer struct {

filter *filter

causality *causality
causality *loader.Causality

lastSyncTime time.Time
}
Expand All @@ -70,7 +71,7 @@ func NewSyncer(ctx context.Context, cp checkpoint.CheckPoint, cfg *SyncerConfig)
syncer.ctx, syncer.cancel = context.WithCancel(ctx)
syncer.initCommitTS = cp.TS()
syncer.positions = make(map[string]int64)
syncer.causality = newCausality()
syncer.causality = loader.NewCausality()
syncer.lastSyncTime = time.Now()
syncer.filter = newFilter(formatIgnoreSchemas(cfg.IgnoreSchemas), cfg.DoDBs, cfg.DoTables)

Expand Down Expand Up @@ -243,7 +244,7 @@ func (s *Syncer) addJob(job *job) {
if wait {
eventCounter.WithLabelValues("savepoint").Add(1)
s.jobWg.Wait()
s.causality.reset()
s.causality.Reset()
s.savePoint(job.commitTS)
}
}
Expand All @@ -270,18 +271,18 @@ func (s *Syncer) resolveCasuality(keys []string) (string, error) {
return keys[0], nil
}

if s.causality.detectConflict(keys) {
if s.causality.DetectConflict(keys) {
if err := s.flushJobs(); err != nil {
return "", errors.Trace(err)
}
s.causality.reset()
s.causality.Reset()
}

if err := s.causality.add(keys); err != nil {
if err := s.causality.Add(keys); err != nil {
return "", errors.Trace(err)
}

return s.causality.get(keys[0]), nil
return s.causality.Get(keys[0]), nil
}

func (s *Syncer) flushJobs() error {
Expand Down
31 changes: 31 additions & 0 deletions pkg/loader/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
loader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(TBH the code feels more like "syncer" than "loader" 🙃)

======

A pkg to load data to mysql in real-time, aim to be used by *reparo*, *drainer* etc unified.
july2993 marked this conversation as resolved.
Show resolved Hide resolved


### Getting started
- Example is available via [example_loader_test.go](./example_loader_test.go)

you need to write a translater to use *Loader* like *SlaveBinlogToTxn* in [translate.go](./translate.go)
july2993 marked this conversation as resolved.
Show resolved Hide resolved


## Overview
Loader will split the upstream transaction DML events and concurrently(shared by primary key or unique key) load data to mysql, it will solve the causality by [causality.go](./causality.go).
july2993 marked this conversation as resolved.
Show resolved Hide resolved


## Optimization
#### Large Operation
Instead of execute DML one by one, we can combine many small operations into a single large operation like use INSERT statements with multiple VALUES lists to insert several rows at a time, this may get [high-speed](https://medium.com/@benmorel/high-speed-inserts-with-mysql-9d3dcd76f723) compare to insert one by one.
july2993 marked this conversation as resolved.
Show resolved Hide resolved

#### Merge by Primary Key
You may want to read [log-compaction](https://kafka.apache.org/documentation/#compaction) of kafka.
july2993 marked this conversation as resolved.
Show resolved Hide resolved

Let's say for a table with Primary Key, we can treat it like a KV-store, to reload the table with the change history of table, we only need the last value for every key.
july2993 marked this conversation as resolved.
Show resolved Hide resolved

While syncing data into downstream at real-time, we can get DML events from upstream in batch and merge by key, after merge, there's only one event for one key, at downstream, we don't need doing as many events as upstream, this also help we to use batch insert operation.
july2993 marked this conversation as resolved.
Show resolved Hide resolved

We should consider secondary unique key here, see *execTableBatch* in [executor.go](./executor.go). currently, we only merge by primary key and do batch operation if the table have primary key and no unique key.
july2993 marked this conversation as resolved.
Show resolved Hide resolved



233 changes: 233 additions & 0 deletions pkg/loader/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package loader

import (
"database/sql"
"fmt"
"sync"
"testing"

_ "github.com/go-sql-driver/mysql"
"github.com/juju/errors"
"github.com/ngaut/log"
)

func getTestDB() (db *sql.DB, err error) {
dsn := "root:@tcp(127.0.0.1:3306)/?charset=utf8&interpolateParams=true&readTimeout=1m&multiStatements=true"
db, err = sql.Open("mysql", dsn)
return
}

func BenchmarkInsertMerge(b *testing.B) {
benchmarkWrite(b, true)
}

func BenchmarkInsertNoMerge(b *testing.B) {
benchmarkWrite(b, false)
}

func BenchmarkUpdateMerge(b *testing.B) {
benchmarkUpdate(b, true)
}

func BenchmarkUpdateNoMerge(b *testing.B) {
benchmarkUpdate(b, false)
}

func BenchmarkDeleteMerge(b *testing.B) {
benchmarkDelete(b, true)
}

func BenchmarkDeleteNoMerge(b *testing.B) {
benchmarkDelete(b, false)
}

func benchmarkUpdate(b *testing.B, merge bool) {
log.SetLevelByString("error")

r, err := newRunner(merge)
if err != nil {
b.Fatal(err)
}

dropTable(r.db, r.loader)
createTable(r.db, r.loader)

loadTable(r.db, r.loader, b.N)

b.ResetTimer()
updateTable(r.db, r.loader, b.N)

r.close()
}

func benchmarkDelete(b *testing.B, merge bool) {
log.SetLevelByString("error")

r, err := newRunner(merge)
if err != nil {
b.Fatal(err)
}

dropTable(r.db, r.loader)
createTable(r.db, r.loader)

loadTable(r.db, r.loader, b.N)

b.ResetTimer()
deleteTable(r.db, r.loader, b.N)
kennytm marked this conversation as resolved.
Show resolved Hide resolved

r.close()
}

func benchmarkWrite(b *testing.B, merge bool) {
log.SetLevelByString("error")

r, err := newRunner(merge)
if err != nil {
b.Fatal(err)
}

dropTable(r.db, r.loader)
createTable(r.db, r.loader)

b.ResetTimer()
loadTable(r.db, r.loader, b.N)

r.close()
}

type runner struct {
db *sql.DB
loader *Loader
wg sync.WaitGroup
}

func newRunner(merge bool) (r *runner, err error) {
db, err := getTestDB()
if err != nil {
return nil, errors.Trace(err)
}

loader, err := NewLoader(db, WorkerCount(16), BatchSize(128))
if err != nil {
return nil, errors.Trace(err)
}

loader.merge = merge

r = new(runner)
r.db = db
r.loader = loader

r.wg.Add(1)
go func() {
err := loader.Run()
if err != nil {
log.Fatal(err)
}
r.wg.Done()
}()

go func() {
for range loader.Successes() {

}
}()

return
}

func (r *runner) close() {
r.loader.Close()
r.wg.Wait()
}

func createTable(db *sql.DB, loader *Loader) error {
var sql string

sql = "create table test1(id int primary key, a1 int)"
// sql = "create table test1(id int, a1 int, UNIQUE KEY `id` (`id`))"
loader.Input() <- NewDDLTxn("test", "test1", sql)

return nil
}

func dropTable(db *sql.DB, loader *Loader) error {
sql := fmt.Sprintf("drop table if exists test1")
loader.Input() <- NewDDLTxn("test", "test1", sql)
return nil
}

func loadTable(db *sql.DB, loader *Loader, n int) error {
var txns []*Txn
for i := 0; i < n; i++ {
txn := new(Txn)
dml := new(DML)
dml.Database = "test"
dml.Table = "test1"
dml.Tp = InsertDMLType
dml.Values = make(map[string]interface{})
dml.Values["id"] = i
dml.Values["a1"] = i

txn.AppendDML(dml)
txns = append(txns, txn)
}

for _, txn := range txns {
loader.Input() <- txn
}

return nil
}

func updateTable(db *sql.DB, loader *Loader, n int) error {
var txns []*Txn
for i := 0; i < n; i++ {
txn := new(Txn)
dml := new(DML)
dml.Database = "test"
dml.Table = "test1"
dml.Tp = UpdateDMLType
dml.OldValues = make(map[string]interface{})
dml.OldValues["id"] = i
dml.OldValues["a1"] = i

dml.Values = make(map[string]interface{})
dml.Values["id"] = i
dml.Values["a1"] = i * 10

txn.AppendDML(dml)
txns = append(txns, txn)
}

for _, txn := range txns {
loader.Input() <- txn
}

return nil
}

func deleteTable(db *sql.DB, loader *Loader, n int) error {
var txns []*Txn
for i := 0; i < n; i++ {
txn := new(Txn)
dml := new(DML)
dml.Database = "test"
dml.Table = "test1"
dml.Tp = DeleteDMLType
dml.Values = make(map[string]interface{})
dml.Values["id"] = i
dml.Values["a1"] = i
july2993 marked this conversation as resolved.
Show resolved Hide resolved

txn.AppendDML(dml)
txns = append(txns, txn)
}

for _, txn := range txns {
loader.Input() <- txn
}

return nil

}
22 changes: 11 additions & 11 deletions drainer/causality.go → pkg/loader/causality.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package drainer
package loader

import "github.com/juju/errors"

// causality provides a simple mechanism to improve the concurrency of SQLs execution under the premise of ensuring correctness.
// Causality provides a simple mechanism to improve the concurrency of SQLs execution under the premise of ensuring correctness.
// causality groups sqls that maybe contain causal relationships, and syncer executes them linearly.
// if some conflicts exist in more than one groups, then syncer waits all SQLs that are grouped be executed and reset causality.
// this mechanism meets quiescent consistency to ensure correctness.
type causality struct {
type Causality struct {
relations map[string]string
}

func newCausality() *causality {
return &causality{
func NewCausality() *Causality {
return &Causality{
relations: make(map[string]string),
}
}

func (c *causality) add(keys []string) error {
func (c *Causality) Add(keys []string) error {
if len(keys) == 0 {
return nil
}

if c.detectConflict(keys) {
if c.DetectConflict(keys) {
return errors.New("some conflicts in causality, must be resolved")
}
// find causal key
Expand All @@ -54,16 +54,16 @@ func (c *causality) add(keys []string) error {
return nil
}

func (c *causality) get(key string) string {
func (c *Causality) Get(key string) string {
return c.relations[key]
}

func (c *causality) reset() {
func (c *Causality) Reset() {
c.relations = make(map[string]string)
}

// detectConflict detects whether there is a conflict
func (c *causality) detectConflict(keys []string) bool {
// DetectConflict detects whether there is a conflict
func (c *Causality) DetectConflict(keys []string) bool {
if len(keys) == 0 {
return false
}
Expand Down
Loading