diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index c6ba5de8235..3a886df4ad2 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -20,6 +20,8 @@ import ( "math" "time" + "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -75,7 +77,8 @@ func newProcessor4Test(ctx context.Context) *processor { } func applyPatches(c *check.C, state *changefeedState) { - for _, patch := range state.pendingPatches { + for _, p := range state.pendingPatches { + patch := p.(*orchestrator.SingleDataPatch) key := &etcd.CDCKey{} c.Assert(key.Parse(patch.Key.String()), check.IsNil) var value []byte @@ -105,7 +108,7 @@ func applyPatches(c *check.C, state *changefeedState) { default: c.Fatal("unexpected key type") } - newValue, err := patch.Fun(value) + newValue, _, err := patch.Func(value) c.Assert(err, check.IsNil) err = state.UpdateCDCKey(key, newValue) c.Assert(err, check.IsNil) diff --git a/cdc/processor/state.go b/cdc/processor/state.go index 0176a01d200..a3a52817968 100644 --- a/cdc/processor/state.go +++ b/cdc/processor/state.go @@ -65,8 +65,8 @@ func (s *globalState) Update(key util.EtcdKey, value []byte, isInit bool) error return nil } -func (s *globalState) GetPatches() []*orchestrator.DataPatch { - var pendingPatches []*orchestrator.DataPatch +func (s *globalState) GetPatches() []orchestrator.DataPatch { + var pendingPatches []orchestrator.DataPatch for _, changefeedState := range s.Changefeeds { pendingPatches = append(pendingPatches, changefeedState.GetPatches()...) } @@ -82,7 +82,7 @@ type changefeedState struct { TaskStatus *model.TaskStatus Workload model.TaskWorkload - pendingPatches []*orchestrator.DataPatch + pendingPatches []orchestrator.DataPatch } func newChangeFeedState(id model.ChangeFeedID, captureID model.CaptureID) *changefeedState { @@ -182,7 +182,7 @@ func (s *changefeedState) Active() bool { return s.Info != nil && s.Status != nil && s.TaskStatus != nil } -func (s *changefeedState) GetPatches() []*orchestrator.DataPatch { +func (s *changefeedState) GetPatches() []orchestrator.DataPatch { pendingPatches := s.pendingPatches s.pendingPatches = nil return pendingPatches @@ -240,26 +240,30 @@ func (s *changefeedState) PatchTaskWorkload(fn func(model.TaskWorkload) (model.T } func (s *changefeedState) patchAny(key string, tpi interface{}, fn func(interface{}) (interface{}, error)) { - patch := &orchestrator.DataPatch{ + patch := &orchestrator.SingleDataPatch{ Key: util.NewEtcdKey(key), - Fun: func(v []byte) ([]byte, error) { + Func: func(v []byte) ([]byte, bool, error) { var e interface{} if v != nil { tp := reflect.TypeOf(tpi) e = reflect.New(tp.Elem()).Interface() err := json.Unmarshal(v, e) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } } ne, err := fn(e) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } if reflect.ValueOf(ne).IsNil() { - return nil, nil + return nil, true, nil } - return json.Marshal(ne) + v, err = json.Marshal(ne) + if err != nil { + return nil, false, errors.Trace(err) + } + return v, true, nil }, } s.pendingPatches = append(s.pendingPatches, patch) diff --git a/cdc/processor/state_test.go b/cdc/processor/state_test.go index b8021958970..33f83f96495 100644 --- a/cdc/processor/state_test.go +++ b/cdc/processor/state_test.go @@ -52,8 +52,9 @@ func newMockReactorStatePatcher(c *check.C, state orchestrator.ReactorState) *mo func (m *mockReactorStatePatcher) applyPatches() { patches := m.state.GetPatches() m.c.Assert(m.state.GetPatches(), check.HasLen, 0) - for _, patch := range patches { - newValue, err := patch.Fun(m.rawState[patch.Key]) + for _, p := range patches { + patch := p.(*orchestrator.SingleDataPatch) + newValue, _, err := patch.Func(m.rawState[patch.Key]) m.c.Assert(err, check.IsNil) err = m.state.Update(patch.Key, newValue, false) m.c.Assert(err, check.IsNil) diff --git a/pkg/orchestrator/doc.go b/pkg/orchestrator/doc.go new file mode 100644 index 00000000000..ead0578d865 --- /dev/null +++ b/pkg/orchestrator/doc.go @@ -0,0 +1,46 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package orchestrator mainly implements a ETCD worker. +A ETCD worker is used to read/write data from ETCD servers based on snapshot and data patches. +Here is a detailed description of how the ETCD worker works: + + ETCD Servers + | ^ + | | + 1. Watch | | 5. Txn + | | + v | + EtcdWorker + | ^ + | | + 2. Update| | 4. DataPatch + +--------+ +-------+ + | | + | | + v 3.Tick | + ReactorState ----------> Reactor + +1. EtcdWorker watches the txn modification log from ETCD servers +2. EtcdWorker updates the txn modification listened from ETCD servers by calling the Update function of ReactorState +3. EtcdWorker calls the Tick function of Reactor, and EtcdWorker make sure the state of ReactorState is a consistent snapshot of ETCD servers +4. Reactor is implemented by the upper layer application. Usually, Reactor will produce DataPatches when the Tick function called + EtcdWorker apply all the DataPatches produced by Reactor +5. EtcdWorker commits a txn to ETCD according to DataPatches + +The upper layer application which is a user of EtcdWorker only need to implement Reactor and ReactorState interface. +The ReactorState is used to maintenance status of ETCD, and the Reactor can produce DataPatches differently according to the ReactorState. +The EtcdWorker make sure any ReactorState which perceived by Reactor must be a consistent snapshot of ETCD servers. +*/ +package orchestrator diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 22203c565ac..38c5a1fe93e 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -14,7 +14,6 @@ package orchestrator import ( - "bytes" "context" "time" @@ -88,7 +87,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, watchCh := worker.client.Watch(ctx1, worker.prefix.String(), clientv3.WithPrefix()) var ( - pendingPatches []*DataPatch + pendingPatches []DataPatch exiting bool sessionDone <-chan struct{} ) @@ -144,7 +143,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if len(pendingPatches) > 0 { // Here we have some patches yet to be uploaded to Etcd. - err := worker.applyPatches(ctx, pendingPatches) + err := worker.applyPatches(ctx, pendingPatches, session) if err != nil { if cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) { continue @@ -223,81 +222,52 @@ func (worker *EtcdWorker) syncRawState(ctx context.Context) error { return nil } -func mergePatch(patches []*DataPatch) []*DataPatch { - patchMap := make(map[util.EtcdKey][]*DataPatch) - for _, patch := range patches { - patchMap[patch.Key] = append(patchMap[patch.Key], patch) - } - result := make([]*DataPatch, 0, len(patchMap)) - for key, patches := range patchMap { - patches := patches - result = append(result, &DataPatch{ - Key: key, - Fun: func(old []byte) ([]byte, error) { - for _, patch := range patches { - newValue, err := patch.Fun(old) - if err != nil { - if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { - continue - } - return nil, err - } - old = newValue - } - return old, nil - }, - }) +func (worker *EtcdWorker) cloneRawState() map[util.EtcdKey][]byte { + ret := make(map[util.EtcdKey][]byte) + for k, v := range worker.rawState { + cloneV := make([]byte, len(v)) + copy(cloneV, v) + ret[util.NewEtcdKey(k.String())] = cloneV } - return result + return ret } -func etcdValueEqual(left, right []byte) bool { - if len(left) == 0 && len(right) == 0 { - return (left == nil && right == nil) || (left != nil && right != nil) - } - return bytes.Equal(left, right) -} - -func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []*DataPatch) error { - patches = mergePatch(patches) - cmps := make([]clientv3.Cmp, 0, len(patches)) - ops := make([]clientv3.Op, 0, len(patches)) - +func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch, session *concurrency.Session) error { + state := worker.cloneRawState() + changedSet := make(map[util.EtcdKey]struct{}) for _, patch := range patches { - old, ok := worker.rawState[patch.Key] - - value, err := patch.Fun(old) + err := patch.Patch(state, changedSet) if err != nil { if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { continue } return errors.Trace(err) } - + } + cmps := make([]clientv3.Cmp, 0, len(changedSet)) + ops := make([]clientv3.Op, 0, len(changedSet)) + for key := range changedSet { // make sure someone else has not updated the key after the last snapshot var cmp clientv3.Cmp - // if ok is false, it means that the key of this patch is not exist in a committed state - if ok { - cmp = clientv3.Compare(clientv3.ModRevision(patch.Key.String()), "<", worker.revision+1) + if _, ok := worker.rawState[key]; ok { + cmp = clientv3.Compare(clientv3.ModRevision(key.String()), "<", worker.revision+1) } else { + // if ok is false, it means that the key of this patch is not exist in a committed state // this compare is equivalent to `patch.Key` is not exist - cmp = clientv3.Compare(clientv3.ModRevision(patch.Key.String()), "=", 0) + cmp = clientv3.Compare(clientv3.ModRevision(key.String()), "=", 0) } cmps = append(cmps, cmp) - if etcdValueEqual(old, value) { - // Ignore patches that produce a new value that is the same as the old value. - continue - } - + value := state[key] var op clientv3.Op if value != nil { - op = clientv3.OpPut(patch.Key.String(), string(value)) + op = clientv3.OpPut(key.String(), string(value)) } else { - op = clientv3.OpDelete(patch.Key.String()) + op = clientv3.OpDelete(key.String()) } ops = append(ops, op) } + resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit() if err != nil { return errors.Trace(err) diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index 38554eb5099..48dcd1334f8 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -108,7 +108,7 @@ type simpleReactorState struct { values [][]int sum int deltas []*delta - patches []*DataPatch + patches []DataPatch } var keyParseRegexp = regexp.MustCompile(regexp.QuoteMeta(testEtcdKeyPrefix) + `/(.+)`) @@ -118,17 +118,21 @@ func (s *simpleReactorState) Get(i1, i2 int) int { } func (s *simpleReactorState) Inc(i1, i2 int) { - patch := &DataPatch{ + patch := &SingleDataPatch{ Key: util.NewEtcdKey(testEtcdKeyPrefix + "/" + strconv.Itoa(i1)), - Fun: func(old []byte) ([]byte, error) { + Func: func(old []byte) ([]byte, bool, error) { var oldJSON []int err := json.Unmarshal(old, &oldJSON) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } oldJSON[i2]++ - return json.Marshal(oldJSON) + newValue, err := json.Marshal(oldJSON) + if err != nil { + return nil, false, errors.Trace(err) + } + return newValue, true, nil }, } @@ -136,10 +140,10 @@ func (s *simpleReactorState) Inc(i1, i2 int) { } func (s *simpleReactorState) SetSum(sum int) { - patch := &DataPatch{ + patch := &SingleDataPatch{ Key: util.NewEtcdKey(testEtcdKeyPrefix + "/sum"), - Fun: func(_ []byte) ([]byte, error) { - return []byte(strconv.Itoa(sum)), nil + Func: func(_ []byte) ([]byte, bool, error) { + return []byte(strconv.Itoa(sum)), true, nil }, } @@ -187,7 +191,7 @@ func (s *simpleReactorState) Update(key util.EtcdKey, value []byte, isInit bool) return nil } -func (s *simpleReactorState) GetPatches() []*DataPatch { +func (s *simpleReactorState) GetPatches() []DataPatch { ret := s.patches s.patches = nil return ret @@ -289,8 +293,8 @@ func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) er return nil } -func (s *intReactorState) GetPatches() []*DataPatch { - return []*DataPatch{} +func (s *intReactorState) GetPatches() []DataPatch { + return []DataPatch{} } type linearizabilityReactor struct { @@ -359,7 +363,7 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) { type commonReactorState struct { state map[string]string - pendingPatches []*DataPatch + pendingPatches []DataPatch } func (s *commonReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { @@ -367,14 +371,14 @@ func (s *commonReactorState) Update(key util.EtcdKey, value []byte, isInit bool) return nil } -func (s *commonReactorState) AppendPatch(key util.EtcdKey, fun PatchFunc) { - s.pendingPatches = append(s.pendingPatches, &DataPatch{ - Key: key, - Fun: fun, +func (s *commonReactorState) AppendPatch(key util.EtcdKey, fun func(old []byte) (newValue []byte, changed bool, err error)) { + s.pendingPatches = append(s.pendingPatches, &SingleDataPatch{ + Key: key, + Func: fun, }) } -func (s *commonReactorState) GetPatches() []*DataPatch { +func (s *commonReactorState) GetPatches() []DataPatch { pendingPatches := s.pendingPatches s.pendingPatches = nil return pendingPatches @@ -389,23 +393,20 @@ type finishedReactor struct { func (r *finishedReactor) Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) { r.state = state.(*commonReactorState) if r.tickNum < 2 { - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("abc")...) - return + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return append(old, []byte("abc")...), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("123")...) - return + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return append(old, []byte("123")...), true, nil }) r.tickNum++ return r.state, nil } - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("fin")...) - return + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return append(old, []byte("fin")...), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - return nil, nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return nil, true, nil }) return r.state, cerrors.ErrReactorFinished } @@ -449,39 +450,32 @@ type coverReactor struct { func (r *coverReactor) Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) { r.state = state.(*commonReactorState) if r.tickNum < 2 { - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("abc")...) - return + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return append(old, []byte("abc")...), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("123")...) - return + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return append(old, []byte("123")...), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("cba")...) - return + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return append(old, []byte("cba")...), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("321")...) - return + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return append(old, []byte("321")...), true, nil }) r.tickNum++ return r.state, nil } - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("fin")...) - return + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return append(old, []byte("fin")...), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("fin")...) - return + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return append(old, []byte("fin")...), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - return nil, nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return nil, true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("fin")...) - return + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return append(old, []byte("fin")...), true, nil }) return r.state, cerrors.ErrReactorFinished } @@ -527,14 +521,14 @@ type emptyTxnReactor struct { func (r *emptyTxnReactor) Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) { r.state = state.(*commonReactorState) if r.tickNum == 0 { - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - return []byte("abc"), nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return []byte("abc"), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - return []byte("123"), nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return []byte("123"), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - return nil, nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return nil, true, nil }) r.tickNum++ return r.state, nil @@ -546,20 +540,20 @@ func (r *emptyTxnReactor) Tick(ctx context.Context, state ReactorState) (nextSta return nil, errors.Trace(err) } - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - return []byte("123"), nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return []byte("123"), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - return nil, nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return nil, true, nil }) r.tickNum++ return r.state, nil } - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - return nil, nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return nil, true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - return []byte("123"), nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return []byte("123"), true, nil }) return r.state, cerrors.ErrReactorFinished } @@ -604,30 +598,30 @@ type emptyOrNilReactor struct { func (r *emptyOrNilReactor) Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) { r.state = state.(*commonReactorState) if r.tickNum == 0 { - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - return []byte(""), nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return []byte(""), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - return nil, nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return nil, true, nil }) r.tickNum++ return r.state, nil } if r.tickNum == 1 { - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - return nil, nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return nil, true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - return []byte(""), nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return []byte(""), true, nil }) r.tickNum++ return r.state, nil } - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, err error) { - return []byte(""), nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key1"), func(old []byte) (newValue []byte, changed bool, err error) { + return []byte(""), true, nil }) - r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, err error) { - return nil, nil + r.state.AppendPatch(util.NewEtcdKey(r.prefix+"/key2"), func(old []byte) (newValue []byte, changed bool, err error) { + return nil, true, nil }) return r.state, cerrors.ErrReactorFinished } @@ -661,148 +655,3 @@ func (s *etcdWorkerSuite) TestEmptyOrNil(c *check.C) { err = cli.Unwrap().Close() c.Assert(err, check.IsNil) } - -func (s *etcdWorkerSuite) TestMergePatches(c *check.C) { - defer testleak.AfterTest(c)() - testCases := []struct { - state map[util.EtcdKey][]byte - patches []*DataPatch - }{ - { - state: map[util.EtcdKey][]byte{}, - patches: []*DataPatch{}, - }, - { - state: map[util.EtcdKey][]byte{ - util.NewEtcdKey("key1"): []byte("aa"), - }, - patches: []*DataPatch{ - { - Key: util.NewEtcdKey("key1"), - Fun: func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("bb")...) - return - }, - }, - { - Key: util.NewEtcdKey("key1"), - Fun: func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("cc")...) - return - }, - }, - { - Key: util.NewEtcdKey("key1"), - Fun: func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("dd")...) - return - }, - }, - }, - }, - { - state: map[util.EtcdKey][]byte{ - util.NewEtcdKey("key1"): []byte("aa"), - }, - patches: []*DataPatch{ - { - Key: util.NewEtcdKey("key1"), - Fun: func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("bb")...) - return - }, - }, - { - Key: util.NewEtcdKey("key2"), - Fun: func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("cc")...) - return - }, - }, - { - Key: util.NewEtcdKey("key1"), - Fun: func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("dd")...) - return - }, - }, - { - Key: util.NewEtcdKey("key2"), - Fun: func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("ee")...) - return - }, - }, - }, - }, - { - state: map[util.EtcdKey][]byte{ - util.NewEtcdKey("key1"): []byte("aa"), - }, - patches: []*DataPatch{ - { - Key: util.NewEtcdKey("key1"), - Fun: func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("bb")...) - err = cerrors.ErrEtcdIgnore - return - }, - }, - { - Key: util.NewEtcdKey("key2"), - Fun: func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("cc")...) - return - }, - }, - { - Key: util.NewEtcdKey("key1"), - Fun: func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("dd")...) - return - }, - }, - { - Key: util.NewEtcdKey("key2"), - Fun: func(old []byte) (newValue []byte, err error) { - newValue = append(old, []byte("ee")...) - err = cerrors.ErrEtcdIgnore - return - }, - }, - }, - }, - } - - applyPatches := func(state map[util.EtcdKey][]byte, patches []*DataPatch) map[util.EtcdKey][]byte { - // clone state map - clonedState := make(map[util.EtcdKey][]byte, len(state)) - for k, v := range state { - clonedState[k] = v - } - // apply patches - for _, p := range patches { - newValue, err := p.Fun(clonedState[p.Key]) - if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { - continue - } - c.Assert(err, check.IsNil) - clonedState[p.Key] = newValue - } - return clonedState - } - for _, tc := range testCases { - mergedPatches := mergePatch(tc.patches) - c.Assert(applyPatches(tc.state, mergedPatches), check.DeepEquals, applyPatches(tc.state, tc.patches)) - } -} - -func (s *etcdWorkerSuite) TestEtcdValueEqual(c *check.C) { - defer testleak.AfterTest(c)() - c.Assert(etcdValueEqual(nil, nil), check.IsTrue) - c.Assert(etcdValueEqual(nil, []byte{}), check.IsFalse) - c.Assert(etcdValueEqual([]byte{}, nil), check.IsFalse) - c.Assert(etcdValueEqual([]byte{}, []byte{}), check.IsTrue) - c.Assert(etcdValueEqual([]byte{11}, []byte{11}), check.IsTrue) - c.Assert(etcdValueEqual([]byte{11}, []byte{12}), check.IsFalse) -} diff --git a/pkg/orchestrator/interfaces.go b/pkg/orchestrator/interfaces.go index f0f5290e6f6..a6a967ded3d 100644 --- a/pkg/orchestrator/interfaces.go +++ b/pkg/orchestrator/interfaces.go @@ -25,14 +25,9 @@ type Reactor interface { Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) } -// PatchFunc should be a pure function that returns a new value given the old value. -// The function is called each time the EtcdWorker initiates an Etcd transaction. -type PatchFunc = func(old []byte) (newValue []byte, err error) - -// DataPatch represents an update to a given Etcd key -type DataPatch struct { - Key util.EtcdKey - Fun PatchFunc +// DataPatch represents an update of state +type DataPatch interface { + Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error } // ReactorState models the Etcd state of a reactor @@ -42,5 +37,40 @@ type ReactorState interface { // GetPatches is called by EtcdWorker, and should return a slice of data patches that represents the changes // that a Reactor wants to apply to Etcd. - GetPatches() []*DataPatch + GetPatches() []DataPatch +} + +// SingleDataPatch represents an update to a given Etcd key +type SingleDataPatch struct { + Key util.EtcdKey + // Func should be a pure function that returns a new value given the old value. + // The function is called each time the EtcdWorker initiates an Etcd transaction. + Func func(old []byte) (newValue []byte, changed bool, err error) +} + +// Patch implements the DataPatch interface +func (s *SingleDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error { + value := valueMap[s.Key] + newValue, changed, err := s.Func(value) + if err != nil { + return err + } + if !changed { + return nil + } + changedSet[s.Key] = struct{}{} + if newValue == nil { + delete(valueMap, s.Key) + } else { + valueMap[s.Key] = newValue + } + return nil +} + +// MultiDatePatch represents an update to many keys +type MultiDatePatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error + +// Patch implements the DataPatch interface +func (m MultiDatePatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error { + return m(valueMap, changedSet) } diff --git a/pkg/orchestrator/jsonstate/json_reactor_state.go b/pkg/orchestrator/jsonstate/json_reactor_state.go deleted file mode 100644 index d16620b72fc..00000000000 --- a/pkg/orchestrator/jsonstate/json_reactor_state.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package jsonstate - -import ( - "encoding/json" - "reflect" - - "github.com/pingcap/errors" - cerrors "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/orchestrator" - "github.com/pingcap/ticdc/pkg/orchestrator/util" -) - -// JSONReactorState models a single key whose value is a json object. -type JSONReactorState struct { - // jsonData stores an object serializable to a valid `value` corresponding to `key`. - jsonData interface{} - // modifiedJSONData is the modified snapshot of jsonData that has not been uploaded to Etcd. - modifiedJSONData interface{} - key util.EtcdKey - isUpdatedByReactor bool - patches []JSONPatchFunc -} - -// JSONPatchFunc is a function that updates an object that is serializable to JSON. -// It is okay to modify the input and return the input itself. -// Use ErrEtcdTryAgain and ErrEtcdIgnore to trigger Etcd transaction retries and to give up this update. -type JSONPatchFunc = func(data interface{}) (newData interface{}, err error) - -// NewJSONReactorState returns a new JSONReactorState. -// `data` needs to be a pointer to an object serializable in JSON. -func NewJSONReactorState(key string, data interface{}) (*JSONReactorState, error) { - tp := reflect.TypeOf(data) - if tp.Kind() != reflect.Ptr { - return nil, errors.Errorf("expected pointer type, got %T", data) - } - - copied := reflect.New(tp.Elem()).Interface() - deepCopy(data, copied) - - return &JSONReactorState{ - jsonData: data, - modifiedJSONData: copied, - key: util.NewEtcdKey(key), - isUpdatedByReactor: false, - }, nil -} - -// Update implements the ReactorState interface. -func (s *JSONReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { - if key != s.key { - return nil - } - - err := json.Unmarshal(value, s.jsonData) - if err != nil { - return errors.Trace(err) - } - - deepCopy(s.jsonData, s.modifiedJSONData) - s.isUpdatedByReactor = true - return nil -} - -// GetPatches implements the ReactorState interface.[]*orchestrator.DataPatch -func (s *JSONReactorState) GetPatches() []*orchestrator.DataPatch { - if len(s.patches) == 0 { - return []*orchestrator.DataPatch{} - } - - // We need to let the PatchFunc capture the array of JSONPatchFunc's, - // and let the DataPatch be the sole object referring to those JSONPatchFunc's, - // so that JSONReactorState does not have to worry about when to clean them up. - subPatches := make([]JSONPatchFunc, len(s.patches)) - copy(subPatches, s.patches) - s.patches = s.patches[:0] - - dataPatch := &orchestrator.DataPatch{ - Key: s.key, - Fun: func(old []byte) ([]byte, error) { - tp := reflect.TypeOf(s.jsonData) - oldStruct := reflect.New(tp.Elem()).Interface() - err := json.Unmarshal(old, oldStruct) - if err != nil { - return nil, errors.Trace(err) - } - - for _, f := range subPatches { - newStruct, err := f(oldStruct) - if err != nil { - if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { - continue - } - return nil, errors.Trace(err) - } - oldStruct = newStruct - } - - newBytes, err := json.Marshal(oldStruct) - if err != nil { - return nil, errors.Trace(err) - } - - return newBytes, nil - }, - } - - return []*orchestrator.DataPatch{dataPatch} -} - -// Inner returns a copy of the snapshot of the state. -// DO NOT modify the returned object. The modified object will not be persisted. -func (s *JSONReactorState) Inner() interface{} { - return s.modifiedJSONData -} - -// AddUpdateFunc accepts a JSONPatchFunc that updates the managed JSON-serializable object. -// If multiple JSONPatchFunc's are added within a Tick, they are applied in the order in which AddUpdateFunc has been called. -func (s *JSONReactorState) AddUpdateFunc(f JSONPatchFunc) { - s.patches = append(s.patches, f) -} - -// TODO optimize for performance -func deepCopy(a, b interface{}) { - byt, _ := json.Marshal(a) - _ = json.Unmarshal(byt, b) -} diff --git a/pkg/orchestrator/jsonstate/json_reactor_state_test.go b/pkg/orchestrator/jsonstate/json_reactor_state_test.go deleted file mode 100644 index 1d9c72726d7..00000000000 --- a/pkg/orchestrator/jsonstate/json_reactor_state_test.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package jsonstate - -import ( - "context" - "testing" - "time" - - "github.com/pingcap/check" - "github.com/pingcap/log" - cerrors "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/etcd" - "github.com/pingcap/ticdc/pkg/orchestrator" - "github.com/pingcap/ticdc/pkg/util/testleak" - "github.com/prometheus/client_golang/prometheus" - "go.etcd.io/etcd/clientv3" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" -) - -const ( - testEtcdKeyPrefix = "/cdc_etcd_worker_test" -) - -func Test(t *testing.T) { check.TestingT(t) } - -var _ = check.Suite(&jsonReactorStateSuite{}) - -type jsonReactorStateSuite struct { -} - -type simpleJSONRecord struct { - A int `json:"a"` - B int `json:"b"` - C int `json:"c"` -} - -type simpleJSONReactor struct { - state *JSONReactorState - oldVal int - id int -} - -func (r *simpleJSONReactor) Tick(_ context.Context, state orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) { - if r.oldVal >= 100 { - return r.state, cerrors.ErrReactorFinished - } - newState := state.(*JSONReactorState) - r.state = newState - - snapshot := r.state.Inner().(*simpleJSONRecord) - oldVal := 0 - switch r.id { - case 0: - oldVal = snapshot.A - r.state.AddUpdateFunc(func(data interface{}) (newData interface{}, err error) { - data.(*simpleJSONRecord).A++ - return data, nil - }) - case 1: - oldVal = snapshot.B - r.state.AddUpdateFunc(func(data interface{}) (newData interface{}, err error) { - data.(*simpleJSONRecord).B++ - return data, nil - }) - case 2: - oldVal = snapshot.C - r.state.AddUpdateFunc(func(data interface{}) (newData interface{}, err error) { - data.(*simpleJSONRecord).C++ - return data, nil - }) - } - if r.oldVal != oldVal { - log.Panic("validation failed", zap.Int("id", r.id), zap.Int("expected", r.oldVal), zap.Int("actual", oldVal)) - } - r.oldVal++ - return r.state, nil -} - -func (s *jsonReactorStateSuite) TestSimpleJSONRecord(c *check.C) { - defer testleak.AfterTest(c)() - dir := c.MkDir() - url, etcdServer, err := etcd.SetupEmbedEtcd(dir) - c.Assert(err, check.IsNil) - defer etcdServer.Close() - - newClient := func() *etcd.Client { - rawCli, err := clientv3.NewFromURLs([]string{url.String()}) - c.Check(err, check.IsNil) - return etcd.Wrap(rawCli, map[string]prometheus.Counter{}) - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1) - defer cancel() - - cli := newClient() - _, err = cli.Put(ctx, testEtcdKeyPrefix+"/json", `{"a": 0, "b": 0, "c": 0}`) - c.Assert(err, check.IsNil) - - errg, ctx := errgroup.WithContext(ctx) - for i := 0; i < 3; i++ { - reactor := &simpleJSONReactor{ - state: nil, - oldVal: 0, - id: i, - } - - initState, err := NewJSONReactorState(testEtcdKeyPrefix+"/json", &simpleJSONRecord{}) - c.Assert(err, check.IsNil) - - etcdWorker, err := orchestrator.NewEtcdWorker(newClient(), testEtcdKeyPrefix, reactor, initState) - c.Assert(err, check.IsNil) - - errg.Go(func() error { - err := etcdWorker.Run(ctx, nil, 10*time.Millisecond) - if err != nil { - log.Error("etcdWorker returned error", zap.Error(err)) - } - return err - }) - } - - err = errg.Wait() - c.Assert(err, check.IsNil) -} - -func (s *jsonReactorStateSuite) TestNotPointerError(c *check.C) { - defer testleak.AfterTest(c)() - - _, err := NewJSONReactorState("/json", simpleJSONRecord{}) - c.Assert(err, check.NotNil) -} diff --git a/pkg/orchestrator/reactor_state_tester.go b/pkg/orchestrator/reactor_state_tester.go index 7f2a92acc29..994b16d4ddb 100644 --- a/pkg/orchestrator/reactor_state_tester.go +++ b/pkg/orchestrator/reactor_state_tester.go @@ -14,80 +14,88 @@ package orchestrator import ( + "github.com/pingcap/check" "github.com/pingcap/errors" - cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/orchestrator/util" ) // ReactorStateTester is a helper struct for unit-testing an implementer of ReactorState type ReactorStateTester struct { + c *check.C state ReactorState kvEntries map[string]string } // NewReactorStateTester creates a new ReactorStateTester -func NewReactorStateTester(state ReactorState, initKVEntries map[string]string) *ReactorStateTester { +func NewReactorStateTester(c *check.C, state ReactorState, initKVEntries map[string]string) *ReactorStateTester { + if initKVEntries == nil { + initKVEntries = make(map[string]string) + } + for k, v := range initKVEntries { + err := state.Update(util.NewEtcdKey(k), []byte(v), true) + c.Assert(err, check.IsNil) + } return &ReactorStateTester{ + c: c, state: state, kvEntries: initKVEntries, } } -// UpdateKeys is used to update keys in the mocked kv-store. -func (t *ReactorStateTester) UpdateKeys(updatedKeys map[string][]byte) error { - for key, value := range updatedKeys { - k := util.NewEtcdKey(key) - err := t.state.Update(k, value, false) - if err != nil { - return errors.Trace(err) - } - - if value != nil { - t.kvEntries[key] = string(value) - } else { - delete(t.kvEntries, key) - } +// Update is used to update keys in the mocked kv-store. +func (t *ReactorStateTester) Update(key string, value []byte) error { + k := util.NewEtcdKey(key) + err := t.state.Update(k, value, false) + if err != nil { + return errors.Trace(err) + } + if value != nil { + t.kvEntries[key] = string(value) + } else { + delete(t.kvEntries, key) } - return nil } // ApplyPatches calls the GetPatches method on the ReactorState and apply the changes to the mocked kv-store. func (t *ReactorStateTester) ApplyPatches() error { patches := t.state.GetPatches() - mergedPatches := mergePatch(patches) - for _, patch := range mergedPatches { - old, ok := t.kvEntries[patch.Key.String()] - var ( - newBytes []byte - err error - ) - if ok { - newBytes, err = patch.Fun([]byte(old)) - } else { - newBytes, err = patch.Fun(nil) - } - if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { - continue - } + tmpKVEntries := make(map[util.EtcdKey][]byte) + for k, v := range t.kvEntries { + tmpKVEntries[util.NewEtcdKey(k)] = []byte(v) + } + changedSet := make(map[util.EtcdKey]struct{}) + for _, patch := range patches { + err := patch.Patch(tmpKVEntries, changedSet) if err != nil { - return errors.Trace(err) + return err } - err = t.state.Update(patch.Key, newBytes, false) + } + for k := range changedSet { + err := t.state.Update(k, tmpKVEntries[k], false) if err != nil { - return errors.Trace(err) + return err } - if newBytes == nil { - delete(t.kvEntries, patch.Key.String()) - continue + if value := tmpKVEntries[k]; value != nil { + t.kvEntries[k.String()] = string(value) + } else { + delete(t.kvEntries, k.String()) } - t.kvEntries[patch.Key.String()] = string(newBytes) } - return nil } +// MustApplyPatches calls ApplyPatches and must successfully +func (t *ReactorStateTester) MustApplyPatches() { + t.c.Assert(t.ApplyPatches(), check.IsNil) +} + +// MustUpdate calls Update and must successfully +func (t *ReactorStateTester) MustUpdate(key string, value []byte) { + t.c.Assert(t.Update(key, value), check.IsNil) +} + // KVEntries returns the contents of the mocked KV store. func (t *ReactorStateTester) KVEntries() map[string]string { return t.kvEntries diff --git a/testing_utils/cdc_state_checker/state.go b/testing_utils/cdc_state_checker/state.go index da7b84f1ac5..e46f17aa3aa 100644 --- a/testing_utils/cdc_state_checker/state.go +++ b/testing_utils/cdc_state_checker/state.go @@ -227,6 +227,6 @@ func (s *cdcReactorState) Update(key util.EtcdKey, value []byte, isInit bool) er return nil } -func (s *cdcReactorState) GetPatches() []*orchestrator.DataPatch { +func (s *cdcReactorState) GetPatches() []orchestrator.DataPatch { return nil }