Skip to content

Commit

Permalink
resolved conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen committed May 5, 2022
1 parent 8559044 commit 4122dc4
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 9 deletions.
1 change: 0 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down
1 change: 0 additions & 1 deletion cdc/owner/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func newDDLPuller(ctx cdcContext.Context, upStream *upstream.Upstream, startTs u
kvStorage,
upStream.PDClock,
// Add "_ddl_puller" to make it different from table pullers.
ctx.ChangefeedVars().ID+"_ddl_puller",
model.ChangeFeedID{
Namespace: ctx.ChangefeedVars().ID.Namespace,
// Add "_ddl_puller" to make it different from table pullers.
Expand Down
8 changes: 4 additions & 4 deletions pkg/txnutil/gc/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,18 @@ func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) {
time.Sleep(1 * time.Second)

cfID := model.DefaultChangeFeedID("cfID")
err = gcManager.CheckStaleCheckpointTs(cCtx, cfID, 10)
err := gcManager.CheckStaleCheckpointTs(ctx, cfID, 10)
c.Assert(cerror.ErrGCTTLExceeded.Equal(errors.Cause(err)), check.IsTrue)
c.Assert(cerror.ChangefeedFastFailError(err), check.IsTrue)

err = gcManager.CheckStaleCheckpointTs(cCtx, cfID, oracle.GoTimeToTS(time.Now()))
err = gcManager.CheckStaleCheckpointTs(ctx, cfID, oracle.GoTimeToTS(time.Now()))
c.Assert(err, check.IsNil)

gcManager.isTiCDCBlockGC = false
gcManager.lastSafePointTs = 20

err = gcManager.CheckStaleCheckpointTs(cCtx, cfID, 10)
err = gcManager.CheckStaleCheckpointTs(ctx, cfID, 10)

c.Assert(cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err)), check.IsTrue)
c.Assert(cerror.ChangefeedFastFailError(err), check.IsTrue)
}
5 changes: 2 additions & 3 deletions pkg/upstream/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ package upstream

import (
"context"
"log"
"sync"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/config"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -48,8 +47,8 @@ func NewManager(ctx context.Context) *Manager {
// NewManager4Test returns a Manager for unit test.
func NewManager4Test(pdClient pd.Client) *Manager {
up := NewUpstream4Test(pdClient)
atomic.StoreInt32(&up.status, normal)
res := &Manager{ups: new(sync.Map)}
res.ups.Store(DefaultClusterID, up)
return res
}

Expand Down

0 comments on commit 4122dc4

Please sign in to comment.