Skip to content

Commit

Permalink
*: resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen committed Dec 23, 2021
1 parent d539b8b commit d901d13
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 5,127 deletions.
151 changes: 42 additions & 109 deletions cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"go.etcd.io/etcd/clientv3/concurrency"
Expand Down Expand Up @@ -76,62 +75,29 @@ func (s *Server) handleResignOwner(w http.ResponseWriter, req *http.Request) {
writeError(w, http.StatusBadRequest, cerror.ErrSupportPostOnly.GenWithStackByArgs())
return
}
if config.NewReplicaImpl {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
err := s.captureV2.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.AsyncStop()
return nil
})
handleOwnerResp(w, err)
return
}
s.ownerLock.RLock()
if s.owner == nil {
if s.capture == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
s.ownerLock.RUnlock()
return
}
// Resign is a complex process that needs to be synchronized because
// it happens in two separate goroutines
//
// Imagine that we have goroutines A and B
// A1. Notify the owner to exit
// B1. The owner exits gracefully
// A2. Delete the leader key until the owner has exited
// B2. Restart to campaign
//
// A2 must occur between B1 and B2, so we register the Resign process
// as the stepDown function which is called when the owner exited.
s.owner.Close(req.Context(), func(ctx context.Context) error {
return s.capture.Resign(ctx)
err := s.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.AsyncStop()
return nil
})
s.ownerLock.RUnlock()
s.setOwner(nil)
handleOwnerResp(w, nil)
handleOwnerResp(w, err)
return
}

func (s *Server) handleChangefeedAdmin(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
writeError(w, http.StatusBadRequest, cerror.ErrSupportPostOnly.GenWithStackByArgs())
return
}
if !config.NewReplicaImpl {
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()
if s.owner == nil {
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

if s.capture == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

err := req.ParseForm()
Expand Down Expand Up @@ -160,14 +126,12 @@ func (s *Server) handleChangefeedAdmin(w http.ResponseWriter, req *http.Request)
Type: model.AdminJobType(typ),
Opts: opts,
}
if config.NewReplicaImpl {
err = s.captureV2.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.EnqueueJob(job)
return nil
})
} else {
err = s.owner.EnqueueJob(job)
}

err = s.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.EnqueueJob(job)
return nil
})

handleOwnerResp(w, err)
}

Expand All @@ -176,19 +140,10 @@ func (s *Server) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request
writeError(w, http.StatusBadRequest, cerror.ErrSupportPostOnly.GenWithStackByArgs())
return
}
if !config.NewReplicaImpl {
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()
if s.owner == nil {
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
if s.capture == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

err := req.ParseForm()
Expand All @@ -202,14 +157,11 @@ func (s *Server) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID))
return
}
if config.NewReplicaImpl {
err = s.captureV2.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.TriggerRebalance(changefeedID)
return nil
})
} else {
s.owner.TriggerRebalance(changefeedID)
}
err = s.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.TriggerRebalance(changefeedID)
return nil
})

handleOwnerResp(w, err)
}

Expand All @@ -218,19 +170,11 @@ func (s *Server) handleMoveTable(w http.ResponseWriter, req *http.Request) {
writeError(w, http.StatusBadRequest, cerror.ErrSupportPostOnly.GenWithStackByArgs())
return
}
if !config.NewReplicaImpl {
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()
if s.owner == nil {
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

if s.capture == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

err := req.ParseForm()
Expand All @@ -257,14 +201,12 @@ func (s *Server) handleMoveTable(w http.ResponseWriter, req *http.Request) {
cerror.ErrAPIInvalidParam.GenWithStack("invalid tableID: %s", tableIDStr))
return
}
if config.NewReplicaImpl {
err = s.captureV2.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.ManualSchedule(changefeedID, to, tableID)
return nil
})
} else {
s.owner.ManualSchedule(changefeedID, to, tableID)
}

err = s.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.ManualSchedule(changefeedID, to, tableID)
return nil
})

handleOwnerResp(w, err)
}

Expand All @@ -273,19 +215,10 @@ func (s *Server) handleChangefeedQuery(w http.ResponseWriter, req *http.Request)
writeError(w, http.StatusBadRequest, cerror.ErrSupportPostOnly.GenWithStackByArgs())
return
}
if !config.NewReplicaImpl {
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()
if s.owner == nil {
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
if s.capture == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

err := req.ParseForm()
Expand Down
43 changes: 7 additions & 36 deletions cdc/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,31 +113,10 @@ func (s *Server) writeEtcdInfo(ctx context.Context, cli *kv.CDCEtcdClient, w io.
}

func (s *Server) handleDebugInfo(w http.ResponseWriter, req *http.Request) {
if config.NewReplicaImpl {
s.captureV2.WriteDebugInfo(w)
fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n")
s.writeEtcdInfo(req.Context(), s.etcdClient, w)
return
}
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()
if s.owner != nil {
fmt.Fprintf(w, "\n\n*** owner info ***:\n\n")
s.owner.writeDebugInfo(w)
}

fmt.Fprintf(w, "\n\n*** processors info ***:\n\n")
if config.NewReplicaImpl {
s.capture.processorManager.WriteDebugInfo(w)
} else {
for _, p := range s.capture.processors {
p.writeDebugInfo(w)
fmt.Fprintf(w, "\n")
}
}

s.capture.WriteDebugInfo(w)
fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n")
s.writeEtcdInfo(req.Context(), &s.capture.etcdClient, w)
s.writeEtcdInfo(req.Context(), s.etcdClient, w)
return
}

func (s *Server) handleStatus(w http.ResponseWriter, req *http.Request) {
Expand All @@ -146,21 +125,13 @@ func (s *Server) handleStatus(w http.ResponseWriter, req *http.Request) {
GitHash: version.GitHash,
Pid: os.Getpid(),
}
if config.NewReplicaImpl {
if s.captureV2 != nil {
st.ID = s.captureV2.Info().ID
st.IsOwner = s.captureV2.IsOwner()
}
writeData(w, st)
return
}
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()

if s.capture != nil {
st.ID = s.capture.info.ID
st.ID = s.capture.Info().ID
st.IsOwner = s.capture.IsOwner()
}
st.IsOwner = s.owner != nil
writeData(w, st)
return
}

func writeInternalServerError(w http.ResponseWriter, err error) {
Expand Down
13 changes: 0 additions & 13 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/tiflow/cdc/processor"
tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/cdc/puller/sorter"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -37,20 +36,8 @@ func init() {
sink.InitMetrics(registry)
entry.InitMetrics(registry)
orchestrator.InitMetrics(registry)
<<<<<<< HEAD
sorter.InitMetrics(registry)
if config.NewReplicaImpl {
processor.InitMetrics(registry)
tablepipeline.InitMetrics(registry)
owner.InitMetrics(registry)
} else {
initProcessorMetrics(registry)
initOwnerMetrics(registry)
}
=======
processor.InitMetrics(registry)
tablepipeline.InitMetrics(registry)
owner.InitMetrics(registry)
>>>>>>> 6591f62df (Clean old owner and old processor in release 5.2 branch (#4019))
initServerMetrics(registry)
}
Loading

0 comments on commit d901d13

Please sign in to comment.