Skip to content

Commit

Permalink
etcd_worker: support update multi-keys in a DataPatch (#1747)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored May 20, 2021
1 parent 06747af commit 3feddf9
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 624 deletions.
7 changes: 5 additions & 2 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"math"
"time"

"github.com/pingcap/ticdc/pkg/orchestrator"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -75,7 +77,8 @@ func newProcessor4Test(ctx context.Context) *processor {
}

func applyPatches(c *check.C, state *changefeedState) {
for _, patch := range state.pendingPatches {
for _, p := range state.pendingPatches {
patch := p.(*orchestrator.SingleDataPatch)
key := &etcd.CDCKey{}
c.Assert(key.Parse(patch.Key.String()), check.IsNil)
var value []byte
Expand Down Expand Up @@ -105,7 +108,7 @@ func applyPatches(c *check.C, state *changefeedState) {
default:
c.Fatal("unexpected key type")
}
newValue, err := patch.Fun(value)
newValue, _, err := patch.Func(value)
c.Assert(err, check.IsNil)
err = state.UpdateCDCKey(key, newValue)
c.Assert(err, check.IsNil)
Expand Down
24 changes: 14 additions & 10 deletions cdc/processor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (s *globalState) Update(key util.EtcdKey, value []byte, isInit bool) error
return nil
}

func (s *globalState) GetPatches() []*orchestrator.DataPatch {
var pendingPatches []*orchestrator.DataPatch
func (s *globalState) GetPatches() []orchestrator.DataPatch {
var pendingPatches []orchestrator.DataPatch
for _, changefeedState := range s.Changefeeds {
pendingPatches = append(pendingPatches, changefeedState.GetPatches()...)
}
Expand All @@ -82,7 +82,7 @@ type changefeedState struct {
TaskStatus *model.TaskStatus
Workload model.TaskWorkload

pendingPatches []*orchestrator.DataPatch
pendingPatches []orchestrator.DataPatch
}

func newChangeFeedState(id model.ChangeFeedID, captureID model.CaptureID) *changefeedState {
Expand Down Expand Up @@ -182,7 +182,7 @@ func (s *changefeedState) Active() bool {
return s.Info != nil && s.Status != nil && s.TaskStatus != nil
}

func (s *changefeedState) GetPatches() []*orchestrator.DataPatch {
func (s *changefeedState) GetPatches() []orchestrator.DataPatch {
pendingPatches := s.pendingPatches
s.pendingPatches = nil
return pendingPatches
Expand Down Expand Up @@ -240,26 +240,30 @@ func (s *changefeedState) PatchTaskWorkload(fn func(model.TaskWorkload) (model.T
}

func (s *changefeedState) patchAny(key string, tpi interface{}, fn func(interface{}) (interface{}, error)) {
patch := &orchestrator.DataPatch{
patch := &orchestrator.SingleDataPatch{
Key: util.NewEtcdKey(key),
Fun: func(v []byte) ([]byte, error) {
Func: func(v []byte) ([]byte, bool, error) {
var e interface{}
if v != nil {
tp := reflect.TypeOf(tpi)
e = reflect.New(tp.Elem()).Interface()
err := json.Unmarshal(v, e)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
}
ne, err := fn(e)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
if reflect.ValueOf(ne).IsNil() {
return nil, nil
return nil, true, nil
}
return json.Marshal(ne)
v, err = json.Marshal(ne)
if err != nil {
return nil, false, errors.Trace(err)
}
return v, true, nil
},
}
s.pendingPatches = append(s.pendingPatches, patch)
Expand Down
5 changes: 3 additions & 2 deletions cdc/processor/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ func newMockReactorStatePatcher(c *check.C, state orchestrator.ReactorState) *mo
func (m *mockReactorStatePatcher) applyPatches() {
patches := m.state.GetPatches()
m.c.Assert(m.state.GetPatches(), check.HasLen, 0)
for _, patch := range patches {
newValue, err := patch.Fun(m.rawState[patch.Key])
for _, p := range patches {
patch := p.(*orchestrator.SingleDataPatch)
newValue, _, err := patch.Func(m.rawState[patch.Key])
m.c.Assert(err, check.IsNil)
err = m.state.Update(patch.Key, newValue, false)
m.c.Assert(err, check.IsNil)
Expand Down
46 changes: 46 additions & 0 deletions pkg/orchestrator/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Package orchestrator mainly implements a ETCD worker.
A ETCD worker is used to read/write data from ETCD servers based on snapshot and data patches.
Here is a detailed description of how the ETCD worker works:
ETCD Servers
| ^
| |
1. Watch | | 5. Txn
| |
v |
EtcdWorker
| ^
| |
2. Update| | 4. DataPatch
+--------+ +-------+
| |
| |
v 3.Tick |
ReactorState ----------> Reactor
1. EtcdWorker watches the txn modification log from ETCD servers
2. EtcdWorker updates the txn modification listened from ETCD servers by calling the Update function of ReactorState
3. EtcdWorker calls the Tick function of Reactor, and EtcdWorker make sure the state of ReactorState is a consistent snapshot of ETCD servers
4. Reactor is implemented by the upper layer application. Usually, Reactor will produce DataPatches when the Tick function called
EtcdWorker apply all the DataPatches produced by Reactor
5. EtcdWorker commits a txn to ETCD according to DataPatches
The upper layer application which is a user of EtcdWorker only need to implement Reactor and ReactorState interface.
The ReactorState is used to maintenance status of ETCD, and the Reactor can produce DataPatches differently according to the ReactorState.
The EtcdWorker make sure any ReactorState which perceived by Reactor must be a consistent snapshot of ETCD servers.
*/
package orchestrator
80 changes: 25 additions & 55 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package orchestrator

import (
"bytes"
"context"
"time"

Expand Down Expand Up @@ -88,7 +87,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,

watchCh := worker.client.Watch(ctx1, worker.prefix.String(), clientv3.WithPrefix())
var (
pendingPatches []*DataPatch
pendingPatches []DataPatch
exiting bool
sessionDone <-chan struct{}
)
Expand Down Expand Up @@ -144,7 +143,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,

if len(pendingPatches) > 0 {
// Here we have some patches yet to be uploaded to Etcd.
err := worker.applyPatches(ctx, pendingPatches)
err := worker.applyPatches(ctx, pendingPatches, session)
if err != nil {
if cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) {
continue
Expand Down Expand Up @@ -223,81 +222,52 @@ func (worker *EtcdWorker) syncRawState(ctx context.Context) error {
return nil
}

func mergePatch(patches []*DataPatch) []*DataPatch {
patchMap := make(map[util.EtcdKey][]*DataPatch)
for _, patch := range patches {
patchMap[patch.Key] = append(patchMap[patch.Key], patch)
}
result := make([]*DataPatch, 0, len(patchMap))
for key, patches := range patchMap {
patches := patches
result = append(result, &DataPatch{
Key: key,
Fun: func(old []byte) ([]byte, error) {
for _, patch := range patches {
newValue, err := patch.Fun(old)
if err != nil {
if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) {
continue
}
return nil, err
}
old = newValue
}
return old, nil
},
})
func (worker *EtcdWorker) cloneRawState() map[util.EtcdKey][]byte {
ret := make(map[util.EtcdKey][]byte)
for k, v := range worker.rawState {
cloneV := make([]byte, len(v))
copy(cloneV, v)
ret[util.NewEtcdKey(k.String())] = cloneV
}
return result
return ret
}

func etcdValueEqual(left, right []byte) bool {
if len(left) == 0 && len(right) == 0 {
return (left == nil && right == nil) || (left != nil && right != nil)
}
return bytes.Equal(left, right)
}

func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []*DataPatch) error {
patches = mergePatch(patches)
cmps := make([]clientv3.Cmp, 0, len(patches))
ops := make([]clientv3.Op, 0, len(patches))

func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch, session *concurrency.Session) error {
state := worker.cloneRawState()
changedSet := make(map[util.EtcdKey]struct{})
for _, patch := range patches {
old, ok := worker.rawState[patch.Key]

value, err := patch.Fun(old)
err := patch.Patch(state, changedSet)
if err != nil {
if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) {
continue
}
return errors.Trace(err)
}

}
cmps := make([]clientv3.Cmp, 0, len(changedSet))
ops := make([]clientv3.Op, 0, len(changedSet))
for key := range changedSet {
// make sure someone else has not updated the key after the last snapshot
var cmp clientv3.Cmp
// if ok is false, it means that the key of this patch is not exist in a committed state
if ok {
cmp = clientv3.Compare(clientv3.ModRevision(patch.Key.String()), "<", worker.revision+1)
if _, ok := worker.rawState[key]; ok {
cmp = clientv3.Compare(clientv3.ModRevision(key.String()), "<", worker.revision+1)
} else {
// if ok is false, it means that the key of this patch is not exist in a committed state
// this compare is equivalent to `patch.Key` is not exist
cmp = clientv3.Compare(clientv3.ModRevision(patch.Key.String()), "=", 0)
cmp = clientv3.Compare(clientv3.ModRevision(key.String()), "=", 0)
}
cmps = append(cmps, cmp)

if etcdValueEqual(old, value) {
// Ignore patches that produce a new value that is the same as the old value.
continue
}

value := state[key]
var op clientv3.Op
if value != nil {
op = clientv3.OpPut(patch.Key.String(), string(value))
op = clientv3.OpPut(key.String(), string(value))
} else {
op = clientv3.OpDelete(patch.Key.String())
op = clientv3.OpDelete(key.String())
}
ops = append(ops, op)
}

resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit()
if err != nil {
return errors.Trace(err)
Expand Down
Loading

0 comments on commit 3feddf9

Please sign in to comment.