-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pkg/loader: add pkg to load data to mysql #436
Conversation
/run-all-tests |
we did it in |
update bench test result? |
a bench test of bench_test.go running locally with downstream mysql 5.7 worker-count&batch-size: 16 128 change worker-count&batch-size: 32 256 |
@GregoryIan PTAL |
use same table schema to bench merge or not
return | ||
} | ||
|
||
func quoteSchema(schema string, table string) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some problem about tidb version dependency, keep this simple func
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two important things about it:
- export
input channel
andsuccessful channel
to controlload
how does user know TXNs were all finished?
compare one by one,
or sort them and get one successful TXN, then we can know the TXNs before it were already finished.
It's better to have a README.md and example
- lack of
metrics
andlog
pkg/loader/load.go
Outdated
var singleDMLs []*DML | ||
|
||
for tableName, tableDMLs := range tables { | ||
if len(tableDMLs[0].primaryKeys()) > 0 && s.merge { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we said, we should consider unique keys relations on multiple rows, I think we should check primary key count = 1 and unique key = 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see
tidb-binlog/pkg/loader/executor.go
Line 238 in 03d05d1
// we merge dmls by primary key, after merge by key, we |
but i can add the limit or add an option to open this feature now, maybe use it after more test and prove, or when meet performance problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update: 5f5873b
Ping @july2993, we can speed this pr |
add example and metrics @GregoryIan @WangXiangUSTC PTAL |
Co-Authored-By: july2993 <july2993@gmail.com>
Co-Authored-By: july2993 <july2993@gmail.com>
Co-Authored-By: july2993 <july2993@gmail.com>
Co-Authored-By: july2993 <july2993@gmail.com>
Co-Authored-By: july2993 <july2993@gmail.com>
Co-Authored-By: july2993 <july2993@gmail.com>
Co-Authored-By: july2993 <july2993@gmail.com>
Co-Authored-By: july2993 <july2993@gmail.com>
@GregoryIan @kennytm @WangXiangUSTC PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
pkg/loader/executor.go
Outdated
tableName := inserts[0].Table | ||
|
||
builder := new(strings.Builder) | ||
builder.WriteString(fmt.Sprintf("REPLACE INTO %s(%s) VALUES ", quoteSchema(dbName, tableName), buildColumnList(info.columns))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@july2993 This isn't resolved 🙃
@WangXiangUSTC @kennytm PTAL |
LGTM |
What problem does this PR solve?
a pkg to use to load data to mysql in realtime, can be use in test and reparo, drainer later
#438 contains only update vendor part, you can review and merge that first
What is changed and how it works?
see tests/kafka/kafka.go as a example,
most logic is same as drainer, but if the table has PK, it will merge the same PK record(see merge.go), after merge, only have one record for one PK, so we can easily do bulk insert. this is also beneficial if the upstream update some hot key frequently.
more thinks maybe we can do:
Check List
Tests
Code changes
Side effects
Related changes