-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
vreplication: vplayer #4555
vreplication: vplayer #4555
Conversation
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This allows us to avoid the whole mess about WaitForPos asking for specific positions to be exported. Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Fixes vitessio#3984 Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Ref #4375 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sugu - this is looking great.
I reviewed what I believed were the core parts.
The most relevant thing I noticed is that I didn't see the new player being hooked in horizontal resharding. It seemed to me as we are still using keyrange there, so it will use the old player.
want2.Fields[1].Type = sqltypes.Blob | ||
want2.Fields[1].Charset = 33 | ||
want2.Rows[0][1] = sqltypes.TestValue(sqltypes.Blob, "{\"foo\": \"bar\"}") | ||
if !reflect.DeepEqual(*qr, *want2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the change on this test? I'm not following that. Is this for the padding issue? Maybe we should point to that issue in here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a regression I had to fix. I was making sure that vreplication worked for Maria 10.3 and ran into this.
@@ -329,5 +328,5 @@ func TestControllerStopPosition(t *testing.T) { | |||
} | |||
|
|||
dbClient.Wait() | |||
expectFBCRequest(t, fbc, wantTablet, testPos, nil, &topodatapb.KeyRange{End: []byte{0x80}}) | |||
expectFBCRequest(t, wantTablet, testPos, nil, &topodatapb.KeyRange{End: []byte{0x80}}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these tests are going to be easier to follow if we update the places where MariaDB/0-1-1083
appears as a literal, with the testPos
. We are not being consistent.
if !vre.isOpen { | ||
vre.mu.Unlock() | ||
return errors.New("vreplication engine is closed") | ||
if err := func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nice refactor. This is clearer now.
switch { | ||
case err != nil: | ||
return err | ||
case len(qr.Rows) == 0: | ||
return fmt.Errorf("vreplication stream %d not found", id) | ||
case len(qr.Rows) > 1 || len(qr.Rows[0]) != 1: | ||
case len(qr.Rows) > 1 || len(qr.Rows[0]) != 3: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - I think is going to be clearer if we have a type (VReplicationStatus
) that represents the result of this query. That way we can try to map the result of this query to that type. If that fails, error. Also below, instead of doing: qr.Rows[0][1]
we can have something like: vReplicationStatus.State
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, you're saying the result should be returned as a map or a struct? I think it's a good idea. I intend to do this as part of an upcoming refactor: extend DbClient to directly support these high level functions. That way, we don't have to go back and forth by fetching queries fron binlogplayer and then interpreting the results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I'm saying. Cool, that sounds good.
if err := ts.CreateShard(ctx, testKeyspace, testShard); err != nil { | ||
panic(err) | ||
func TestMain(m *testing.M) { | ||
flag.Parse() // Do not remove this comment, import into google3 depends on it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this. What is google3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
google3 is internal jargon for google's source code. Strangely, this is the only other leak I found: google/googletest#1709 :)
mustSave = true | ||
break | ||
} | ||
if hasAnotherCommit(items, i, j+1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think is worth adding a comment here that provides some of the context that is in:
Filtered replication often ends up receiving a large number of empty transactions. This is required because the player needs to know the latest position of the source. This allows it to stop at that position if requested. This position also needs to be saved, which will allow an external request to check if a required position has been reached.
However, this leads to a large number of empty commits which not only slow down the replay, but also generate binlog bloat on the target.
In order to mitigate this problem, empty transactions are saved at most once every second. This duration is currently not tunable, but can be made so if necessary.
return err | ||
} | ||
} | ||
if err := vp.dbClient.Commit(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if this commit fails, but the vp.setState is set to stopped. Isn't there a risk that this event never gets applied?
Should this commit happen before the set state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the commit fails, the retry will eventually reach the position. So, we're fine. We can also do this as a separate commit, but only later. But I think it's better if we also guarantee that we atomically stop on reaching the stop position. Otherwise, we'll be in running state even after reaching stop pos, which feels weird.
Doing it before is definitely not good because if the commit of the event subsequently fails, we'll be forever in stopped state and will never reach the stop pos.
if posReached { | ||
return io.EOF | ||
} | ||
case binlogdatapb.VEventType_FIELD: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is a field event? Why we update the plan when this happens? This part of the player I'm not following.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Field event is similar to to the Fields of a query result. If a DDL changes anything about the columns of a stream, then we get an update here. For example, the order of columns can change. If so, we have to rebuild the plan to remap the column numbers.
case before != nil && after == nil: | ||
query = vp.generateDelete(tplan, before) | ||
case before == nil && after == nil: | ||
// unreachable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't panic here or error if it's unreachable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not invalid per se. The comment says that the vstreamer currently doesn't send this. So, this code is not testable.
case ct.source.KeyRange != nil: | ||
player := binlogplayer.NewBinlogPlayerKeyRange(dbClient, tablet, ct.source.KeyRange, ct.id, ct.blpStats) | ||
return player.ApplyBinlogEvents(ctx) | ||
case ct.source.Filter != nil: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are missing updating legacy_split_clone
and split_clone
vreplication exec command so it provides a filter instead of a KeyRange. That we way we can test shard splits with the new player.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I didn't want to change that part yet. However, you can issue a VReplicationExec
command to switch back and forth. Once we gain confidence, I can switch them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok. That makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments addressed. I've messaged you for one clarification.
For horizontal resharding, the Rule
is: Match: "/.*"
and Filter: "-80"
. The test for it is in vstreamer_test.go because it's the one that does that filtering.
want2.Fields[1].Type = sqltypes.Blob | ||
want2.Fields[1].Charset = 33 | ||
want2.Rows[0][1] = sqltypes.TestValue(sqltypes.Blob, "{\"foo\": \"bar\"}") | ||
if !reflect.DeepEqual(*qr, *want2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a regression I had to fix. I was making sure that vreplication worked for Maria 10.3 and ran into this.
case ct.source.KeyRange != nil: | ||
player := binlogplayer.NewBinlogPlayerKeyRange(dbClient, tablet, ct.source.KeyRange, ct.id, ct.blpStats) | ||
return player.ApplyBinlogEvents(ctx) | ||
case ct.source.Filter != nil: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I didn't want to change that part yet. However, you can issue a VReplicationExec
command to switch back and forth. Once we gain confidence, I can switch them.
switch { | ||
case err != nil: | ||
return err | ||
case len(qr.Rows) == 0: | ||
return fmt.Errorf("vreplication stream %d not found", id) | ||
case len(qr.Rows) > 1 || len(qr.Rows[0]) != 1: | ||
case len(qr.Rows) > 1 || len(qr.Rows[0]) != 3: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, you're saying the result should be returned as a map or a struct? I think it's a good idea. I intend to do this as part of an upcoming refactor: extend DbClient to directly support these high level functions. That way, we don't have to go back and forth by fetching queries fron binlogplayer and then interpreting the results.
if err := ts.CreateShard(ctx, testKeyspace, testShard); err != nil { | ||
panic(err) | ||
func TestMain(m *testing.M) { | ||
flag.Parse() // Do not remove this comment, import into google3 depends on it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
google3 is internal jargon for google's source code. Strangely, this is the only other leak I found: google/googletest#1709 :)
return err | ||
} | ||
} | ||
if err := vp.dbClient.Commit(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the commit fails, the retry will eventually reach the position. So, we're fine. We can also do this as a separate commit, but only later. But I think it's better if we also guarantee that we atomically stop on reaching the stop position. Otherwise, we'll be in running state even after reaching stop pos, which feels weird.
Doing it before is definitely not good because if the commit of the event subsequently fails, we'll be forever in stopped state and will never reach the stop pos.
if posReached { | ||
return io.EOF | ||
} | ||
case binlogdatapb.VEventType_FIELD: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Field event is similar to to the Fields of a query result. If a DDL changes anything about the columns of a stream, then we get an update here. For example, the order of columns can change. If so, we have to rebuild the plan to remap the column numbers.
case before != nil && after == nil: | ||
query = vp.generateDelete(tplan, before) | ||
case before == nil && after == nil: | ||
// unreachable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not invalid per se. The comment says that the vstreamer currently doesn't send this. So, this code is not testable.
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
Thank you for the clarifications. This looks good to me and should be safe to merge. It won't have any impact in existing flows as Filter rules are not being used in resharding workflows yet. |
… data races on memorytopo Conn.closed (vitessio#4555) * backport of 4549 * Fix import conflict Signed-off-by: Matt Lord <mattalord@gmail.com> --------- Signed-off-by: Matt Lord <mattalord@gmail.com> Co-authored-by: Matt Lord <mattalord@gmail.com>
The new VPlayer makes many improvements over the old binlogplayer. It's expected to have better performance, and it introduces new features as detailed below.
Filter specification
The new VPlayer functionality is triggered by specifying a
Filter
field to the binlog source proto. The KeyRange and Tables field must be nil for the Filter to take effect. The Filter is a functional superset of the previous fields. The filter contains a set of rules. The match for a rule can be a/match
regexp or anexact_match
table name. If a regexp was used, then theFilter
for the rule can be empty, or it can contain a keyrange. If empty, then it's as if we're doing a vertical split. If it contains a keyrange, it's as if we're doing a horizontal split.If the match contains an exact table match, then you must specify a
select
statement as a filter. The simplest of which isselect * from t
.The first rule that matches wins.
Query based filtering
The query based filtering allows you to control how the source changes are applied to the target.
Changing the table name
To specify a different target table name than the source, you can specify a filter like where the
Match
isdst
and the filter isselect * from src
.Changing the column list
In order to change the column list, you must specify the explicit list of column names:
select a, b, c from src
.Mapping to different column names
In order to map to different column names, you must specify aliases:
select a as a
, b as bfrom src
Filtering by keyrange
VPlayer allows you to filter by keyrange by using any column and any vindex. This can essentially be used to reshard by a key that's different from the source shard. For this, you have to specify a where clause to the select:
select * from t1 where in_keyrange(new_key, 'hash', '-80')
.Select expressions
The select expressions can invoke the following three functions:
month
,day
andhour
. This is just a proof-of-concept. We'll need to refine this functionality.Rollups
You can request real-time rollups on some columns of the source table. Example:
select id, sum(price) as amount, count(*) item_count from t group by id
.group by
The group by expression is generally meant to be used for rollups. But it's slightly more versatile. If a group by expression intentionally fails to mention a non-aggregate expression, then the latest value of that column is updated into the target table.
If a group by references all columns of a select list, then the vplayer applies all changes as
insert ignore
, where the first unique value wins for all rows.Performance improvements
Empty transactions
Filtered replication often ends up receiving a large number of empty transactions. This is required because the player needs to know the latest position of the source. This allows it to stop at that position if requested. This position also needs to be saved, which will allow an external request to check if a required position has been reached.
However, this leads to a large number of empty commits which not only slow down the replay, but also generate binlog bloat on the target.
In order to mitigate this problem, empty transactions are saved at most once every second. This duration is currently not tunable, but can be made so if necessary.
Batched commits
VReplication is currently single-threaded. If the number of incoming transactions exceed the rate at which they can be applied, it can fall behind the master. A new in-memory relay log style functionality has been introduced. It accumulates all incoming events. If VReplication falls behind, then multiple events get accumulated. In this situation, the player applies all accumulated events as a single transaction. To prevent memory overflow, the size of a relay log is limited to 10K. If the limit is reached, then we wait till the accumulated events are pulled out by the apply thread.
The 10K is exposed as a flag.
Large transactions
Large transactions will now get split into smaller parts and shipped to the player, which will then re-assemble the statements into a full transaction.
Other changes
DDL handling
We introduce four modes for DDL handling. This is specified by setting the
OnDDL
Running
to make vreplication continue.If a DDL in the source does not match any of the requested tables, then that DDL is not sent.
Retries on lock errors
If there is a deadlock or lock wait error, the transaction is retried after 1 second; the old binlog player only retried on deadlock.
BINARY columns
Issue #3984 is now fixed. Values for binary columns are now padded with zeroes to match the length.
TIMESTAMP
binlog player used a complex expression to map timestamp (UTC) values back to the session timezone. VPlayer instead sets the session timezone to UTC, which allows us to send those values as is.
PK based updates
Updates now use the primary key of the target table. This allows for non-exact columns like float to be correctly replicated, as long as they are not part of the primary key.
For tables with no PK, all columns are treated as PK.
Minimal updates
If the value for a column has not changed, then it's not added to the update statement. If no relevant value has changed, then no update is issued.