Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay/syncer: relay notifies syncer of new write to reduce sync latency #2225

Merged
merged 24 commits into from
Oct 27, 2021

Conversation

D3Hunter
Copy link
Contributor

@D3Hunter D3Hunter commented Oct 15, 2021

What problem does this PR solve?

What is changed and how it works?

  • add Listener function to relay, it notifies after writing events successfully
  • add EventNotifier to local binlog reader(used by syncer), and change polling file change to waiting on write events to reduce sync latency

Check List

Tests

  • Unit test

@ti-chi-bot
Copy link
Member

ti-chi-bot commented Oct 15, 2021

[REVIEW NOTIFICATION]

This pull request has been approved by:

  • Ehco1996
  • lichunzhu

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review.
Reviewer can cancel approval by submitting a request changes review.

@ti-chi-bot ti-chi-bot added the contribution For contributor label Oct 15, 2021
@D3Hunter
Copy link
Contributor Author

/cc @lichunzhu

@ti-chi-bot ti-chi-bot requested a review from lichunzhu October 15, 2021 04:23
@lance6716 lance6716 added needs-cherry-pick-release-2.0 This PR should be cherry-picked to release-2.0. Remove this label after cherry-picked to release-2.0 needs-update-release-note This PR should be added into release notes. Remove this label once the release notes are updated labels Oct 15, 2021
dm/worker/relay.go Show resolved Hide resolved
dm/worker/subtask.go Outdated Show resolved Hide resolved
dm/worker/subtask_holder.go Show resolved Hide resolved
dm/worker/subtask_holder.go Show resolved Hide resolved
relay/relay.go Outdated Show resolved Hide resolved
syncer/syncer.go Outdated Show resolved Hide resolved
loader/lightning.go Outdated Show resolved Hide resolved
pkg/streamer/file.go Show resolved Hide resolved
pkg/streamer/file.go Show resolved Hide resolved
relay/relay.go Outdated Show resolved Hide resolved
pkg/streamer/reader.go Outdated Show resolved Hide resolved
pkg/streamer/reader.go Show resolved Hide resolved
syncer/syncer.go Outdated Show resolved Hide resolved
// OnEvent implements relay.Listener
// only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here
// as relay event need to broadcast to every syncer(most subtask have a syncer).
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems e *replication.BinlogEvent is not used here, why we need this param ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a event listener
the parameter is not used now, but it's added for completeness(observer pattern), may used later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
func (h *subTaskHolder) OnEvent(_ *replication.BinlogEvent) {

pkg/streamer/file_test.go Show resolved Hide resolved
pkg/streamer/file.go Show resolved Hide resolved
@D3Hunter
Copy link
Contributor Author

@Ehco1996

but if upstream have no write for a long time, does this cause reader read and parse same binlog file again and again (every watcherInterval), and this may cause much IO waste

we parse from the last position, so parseFile in go-mysql returns EOF if there nothing to read, for the FORMAT_DESC_EVENT(go-mysql parse it every time), page cache of os will handle it.

it would be better if we can keep the written size of each binlog file, so we'll know whether it's ended, maybe we can add it later.

Copy link
Contributor

@lichunzhu lichunzhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This control path looks a bit long to me.
Relay receives a new event -> Subtask holder -> Relay reader
The way I expect it to work is that notifications are only coordinated within the relay module, and the subtask holder only needs to request a reader from the relay.
Is this PR a transitional design or a final solution?

relay/relay.go Show resolved Hide resolved
@@ -44,12 +46,22 @@ const (
waitRelayCatchupTimeout = 30 * time.Second
)

type relayNotifier struct {
// ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times
ch chan interface{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ch chan interface{}
ch chan struct{}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no gains

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has some cost to convert struct{}{} to interface{}

pkg/streamer/file.go Show resolved Hide resolved
@D3Hunter
Copy link
Contributor Author

D3Hunter commented Oct 19, 2021

@lichunzhu

The way I expect it to work is that notifications are only coordinated within the relay module, and the subtask holder only needs to request a reader from the relay.

i think it should be separate into relay and local binlog reader as current implementation do, that's 2 different things

  • relay responsible for reading from upstream and write into local file, and notify those are interested in event changes.
  • local binlog reader read binlog locally

Is this PR a transitional design or a final solution?

this PR is for optimizing latency with relay on, don't want to introduce too much refactoring

@lance6716
Copy link
Collaborator

[code=38032:class=dm-master:scope=internal:level=high], Message: some error occurs in dm-worker: ErrCode:11106 ErrClass:\"functional\" ErrScope:\"internal\" ErrLevel:\"high\" Message:\"fail to restart streamer controller: the given relay log pos (dm-integration-test-s5hcg-7zsh8-bin.000002, 11232) of meta config is too big, please check it again\" Workaround:\"If the size of the corresponding binlog file has exceeded 4GB, please follow the solution in https://docs.pingcap.com/tidb-data-migration/stable/error-handling#the-relay-unit-throws-error-event-from--in--diff-from-passed-in-event--or-a-replication-task-is-interrupted-with-failing-to-get-or-parse-binlog-errors-like-get-binlog-error-error-1236-hy000-and-binlog-checksum-mismatch-data-may-be-corrupted-returned\" , Workaround: Please execute `query-status` to check status.

seems a BUG revealed in CI

@Ehco1996 Ehco1996 added this to the v2.1.0 milestone Oct 26, 2021
@D3Hunter
Copy link
Contributor Author

D3Hunter commented Oct 26, 2021

[code=38032:class=dm-master:scope=internal:level=high], Message: some error occurs in dm-worker: ErrCode:11106 ErrClass:\"functional\" ErrScope:\"internal\" ErrLevel:\"high\" Message:\"fail to restart streamer controller: the given relay log pos (dm-integration-test-s5hcg-7zsh8-bin.000002, 11232) of meta config is too big, please check it again\" Workaround:\"If the size of the corresponding binlog file has exceeded 4GB, please follow the solution in https://docs.pingcap.com/tidb-data-migration/stable/error-handling#the-relay-unit-throws-error-event-from--in--diff-from-passed-in-event--or-a-replication-task-is-interrupted-with-failing-to-get-or-parse-binlog-errors-like-get-binlog-error-error-1236-hy000-and-binlog-checksum-mismatch-data-may-be-corrupted-returned\" , Workaround: Please execute `query-status` to check status.

seems a BUG revealed in CI

it's an unstable case, not a bug. it happens when we

  • enable-relay, syncer'll restart and checks whether binlog file size < current sync pos, but in this case relay hasn't catch up with current sync pos due to higher load on CI, so syncer reports an error and pause.
  • then we try to stop the task, and this case fail. actually, syncer'll try again later and success, so it can be stopped.

it's hard to reproduce this locally(add sleep(30 ms) after each relay write do reproduce it.)

@Ehco1996
Copy link
Contributor

/run-unit-tests
/run-integration-test

@Ehco1996
Copy link
Contributor

but in this case relay hasn't catch up with current sync pos

maybe we can add some msg in workaround such as wait and retry 😂

dm/worker/server_test.go Outdated Show resolved Hide resolved
dm/worker/source_worker_test.go Outdated Show resolved Hide resolved
@@ -44,12 +46,22 @@ const (
waitRelayCatchupTimeout = 30 * time.Second
)

type relayNotifier struct {
// ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times
ch chan interface{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has some cost to convert struct{}{} to interface{}

dm/worker/subtask.go Show resolved Hide resolved
pkg/streamer/file_test.go Show resolved Hide resolved
pkg/streamer/reader_test.go Show resolved Hide resolved
pkg/streamer/reader_test.go Show resolved Hide resolved
pkg/streamer/reader_test.go Outdated Show resolved Hide resolved
// OnEvent implements relay.Listener
// only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here
// as relay event need to broadcast to every syncer(most subtask have a syncer).
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
func (h *subTaskHolder) OnEvent(_ *replication.BinlogEvent) {

relay/relay.go Show resolved Hide resolved
dm/worker/subtask_test.go Outdated Show resolved Hide resolved
dm/worker/subtask.go Outdated Show resolved Hide resolved
syncer/syncer.go Show resolved Hide resolved
Copy link
Contributor

@lichunzhu lichunzhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM except for the unresolved comments

relay/relay_test.go Show resolved Hide resolved
pkg/streamer/file_test.go Show resolved Hide resolved
@ti-chi-bot ti-chi-bot added the status/LGT1 One reviewer already commented LGTM label Oct 27, 2021
@ti-chi-bot ti-chi-bot added status/LGT2 Two reviewers already commented LGTM, ready for merge and removed status/LGT1 One reviewer already commented LGTM labels Oct 27, 2021
@Ehco1996
Copy link
Contributor

/merge

@ti-chi-bot
Copy link
Member

This pull request has been accepted and is ready to merge.

Commit hash: 99b2ed7

@ti-chi-bot
Copy link
Member

In response to a cherrypick label: new pull request created: #2269.

@D3Hunter D3Hunter deleted the relay-notify branch October 27, 2021 07:27
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
contribution For contributor needs-cherry-pick-release-2.0 This PR should be cherry-picked to release-2.0. Remove this label after cherry-picked to release-2.0 needs-update-release-note This PR should be added into release notes. Remove this label once the release notes are updated size/XXL status/can-merge status/LGT2 Two reviewers already commented LGTM, ready for merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants