Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fb/latency(cdc): agent to be table state awared, to handle different p2p messages. #5820

Merged
merged 32 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1700414
add table struct to agent.
3AceShowHand Jun 8, 2022
31e46aa
agent add table state machine.
3AceShowHand Jun 9, 2022
b9865a8
simplify coordinator.
3AceShowHand Jun 9, 2022
da11e6e
agent to be state awared.
3AceShowHand Jun 9, 2022
fc16d5f
call IsRemoveTableFinished to clean table resource from processor.
3AceShowHand Jun 9, 2022
6b90294
fix message header.
3AceShowHand Jun 10, 2022
b4da4bd
fix agent.
3AceShowHand Jun 10, 2022
ed14860
prepare new agent ready.
3AceShowHand Jun 10, 2022
39f741e
fix some test.
3AceShowHand Jun 10, 2022
218f194
add basic ut.
3AceShowHand Jun 10, 2022
1451b86
fix agent handle message ut.
3AceShowHand Jun 11, 2022
a673b6a
add all test.
3AceShowHand Jun 12, 2022
c5c3bcf
fix agent handle stopping.
3AceShowHand Jun 13, 2022
583369c
adjust pipeline table state.
3AceShowHand Jun 13, 2022
304e16a
refine the agent.
3AceShowHand Jun 13, 2022
e61aeb9
introduce tableManager.
3AceShowHand Jun 13, 2022
46b0d0a
fix all tests.
3AceShowHand Jun 13, 2022
40d98f7
add some new test.
3AceShowHand Jun 13, 2022
32cfe0d
fix log.
3AceShowHand Jun 13, 2022
9f1e1ce
fix by make check
3AceShowHand Jun 13, 2022
6182c47
fix by review comment.
3AceShowHand Jun 13, 2022
f84c0c3
fix by review comment.
3AceShowHand Jun 13, 2022
a1c7573
fix ut.
3AceShowHand Jun 13, 2022
9f55756
fix check.
3AceShowHand Jun 13, 2022
e08f305
agent fix heartbeat does not refresh each tick.
3AceShowHand Jun 13, 2022
95d60ef
remove scheduler log.
3AceShowHand Jun 14, 2022
14858d7
fix ut.
3AceShowHand Jun 14, 2022
62e38ea
Merge branch 'fb/latency' into agent-handle-table-state
3AceShowHand Jun 14, 2022
962f324
fix some test.
3AceShowHand Jun 14, 2022
c86a97a
fix ut.
3AceShowHand Jun 14, 2022
dfcb823
rename
3AceShowHand Jun 14, 2022
a147e91
fix by check.
3AceShowHand Jun 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,31 @@ type TableState int32

// TableState for table pipeline
const (
TableStateUnknown TableState = iota
// TableStateAbsent means the table not found
TableStateAbsent
// TableStatePreparing indicate that the table is preparing connecting to regions
TableStatePreparing TableState = iota
TableStatePreparing
// TableStatePrepared means the first `Resolved Ts` is received.
TableStatePrepared
// TableStateReplicating means that sink is consuming data from the sorter, and replicating it to downstream
// TableStateReplicating means that sink is consuming data from the sorter,
// and replicating it to downstream
TableStateReplicating
// TableStateStopping means the table is stopping, but not guaranteed yet.
// at the moment, this state is not used, only keep aligned with `schedulepb.TableStateStopping`
TableStateStopping
// TableStateStopped means sink stop all works.
// TableStateStopped means sink stop all works, but the table resource not released yet.
TableStateStopped
// TableStateAbsent means the table not found
TableStateAbsent
)

var tableStatusStringMap = map[TableState]string{
TableStateUnknown: "Unknown",
TableStateAbsent: "Absent",
TableStatePreparing: "Preparing",
TableStatePrepared: "Prepared",
TableStateReplicating: "Replicating",
TableStateStopping: "Stopping",
TableStateStopped: "Stopped",
TableStateAbsent: "Absent",
}

func (s TableState) String() string {
Expand Down Expand Up @@ -92,7 +96,7 @@ type TablePipeline interface {
AsyncStop(targetTs model.Ts) bool

// Start the sink consume data from the given `ts`
Start(ts model.Ts) bool
Start(ts model.Ts)

// Workload returns the workload of this table
Workload() model.WorkloadInfo
Expand Down
4 changes: 1 addition & 3 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,11 @@ func (t *tableActor) Wait() {
_ = t.wg.Wait()
}

func (t *tableActor) Start(ts model.Ts) bool {
func (t *tableActor) Start(ts model.Ts) {
if atomic.CompareAndSwapInt32(&t.sortNode.started, 0, 1) {
t.sortNode.startTsCh <- ts
close(t.sortNode.startTsCh)
return true
}
return false
}

// MemoryConsumption return the memory consumption in bytes
Expand Down
3 changes: 1 addition & 2 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ func (m *mockTablePipeline) Wait() {
// do nothing
}

func (m *mockTablePipeline) Start(ts model.Ts) bool {
func (m *mockTablePipeline) Start(ts model.Ts) {
m.sinkStartTs = ts
return true
}

// MemoryConsumption return the memory consumption in bytes
Expand Down
Loading