Skip to content

Commit

Permalink
Merge branch 'update-go-mysql' of github.com:lance6716/ticdc into upd…
Browse files Browse the repository at this point in the history
…ate-go-mysql
  • Loading branch information
lance6716 committed Dec 17, 2021
2 parents 8682019 + 5b024dc commit 022b4f1
Show file tree
Hide file tree
Showing 13 changed files with 392 additions and 293 deletions.
3 changes: 3 additions & 0 deletions CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Contributor Covenant Code of Conduct

See the [Contributor Covenant Code of Conduct](https://github.com/pingcap/community/blob/master/CODE_OF_CONDUCT.md)
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ your contribution accepted.

Developing TiDB-CDC requires:

* [Go 1.13+](http://golang.org/doc/code.html)
* [Go 1.16+](https://go.dev/doc/code)
* An internet connection to download the dependencies

Simply run `make` to build the program.
Expand All @@ -32,7 +32,7 @@ For more information on how to trigger these tests, please see the [command help

### Updating dependencies

TiDB-CDC uses [Go 1.11 module](https://github.com/golang/go/wiki/Modules) to manage dependencies. To add or update a
TiDB-CDC uses [Go Modules](https://github.com/golang/go/wiki/Modules) to manage dependencies. To add or update a
dependency: use the `go mod edit` command to change the dependency.

## Contribution flow
Expand Down
33 changes: 33 additions & 0 deletions SECURITY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Security Vulnerability Disclosure and Response Process

TiDB is a fast-growing open source database. To ensure its security, a security vulnerability disclosure and response process is adopted.

The primary goal of this process is to reduce the total exposure time of users to publicly known vulnerabilities. To quickly fix vulnerabilities of TiDB products, the security team is responsible for the entire vulnerability management process, including internal communication and external disclosure.

If you find a vulnerability or encounter a security incident involving vulnerabilities of TiDB products, please report it as soon as possible to the TiDB security team (security@tidb.io).

Please kindly help provide as much vulnerability information as possible in the following format:

- Issue title*:

- Overview*:

- Affected components and version number*:

- CVE number (if any):

- Vulnerability verification process*:

- Contact information*:

The asterisk (*) indicates the required field.

# Response Time

The TiDB security team will confirm the vulnerabilities and contact you within 2 working days after your submission.

We will publicly thank you after fixing the security vulnerability. To avoid negative impact, please keep the vulnerability confidential until we fix it. We would appreciate it if you could obey the following code of conduct:

The vulnerability will not be disclosed until TiDB releases a patch for it.

The details of the vulnerability, for example, exploits code, will not be disclosed.
187 changes: 0 additions & 187 deletions cdc/owner/async_sink.go

This file was deleted.

31 changes: 15 additions & 16 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type changefeed struct {
redoManager redo.LogManager

schema *schemaWrap4Owner
sink AsyncSink
sink DDLSink
ddlPuller DDLPuller
initialized bool
// isRemoved is true if the changefeed is removed
Expand All @@ -57,21 +57,20 @@ type changefeed struct {
// After the DDL event has been executed, ddlEventCache will be set to nil.
ddlEventCache *model.DDLEvent

errCh chan error
errCh chan error
// cancel the running goroutine start by `DDLPuller`
cancel context.CancelFunc

// The changefeed will start some backend goroutines in the function `initialize`,
// such as DDLPuller, Sink, etc.
// such as DDLPuller, DDLSink, etc.
// `wg` is used to manage those backend goroutines.
// But it only manages the DDLPuller for now.
// TODO: manage the Sink and other backend goroutines.
wg sync.WaitGroup

metricsChangefeedCheckpointTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newSink func(ctx cdcContext.Context) (AsyncSink, error)
newSink func() DDLSink
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler, error)
}

Expand All @@ -88,16 +87,16 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
cancel: func() {},

newDDLPuller: newDDLPuller,
newSink: newDDLSink,
}
c.newSink = newAsyncSink
c.newScheduler = newScheduler
return c
}

func newChangefeed4Test(
id model.ChangeFeedID, gcManager gc.Manager,
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func(ctx cdcContext.Context) (AsyncSink, error),
newSink func() DDLSink,
) *changefeed {
c := newChangefeed(id, gcManager)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -171,7 +170,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
default:
}

c.sink.EmitCheckpointTs(ctx, checkpointTs)
c.sink.emitCheckpointTs(ctx, checkpointTs)
barrierTs, err := c.handleBarrier(ctx)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -262,12 +261,12 @@ LOOP:
if err != nil {
return errors.Trace(err)
}

cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel
c.sink, err = c.newSink(cancelCtx)
if err != nil {
return errors.Trace(err)
}

c.sink = c.newSink()
c.sink.run(cancelCtx, cancelCtx.ChangefeedVars().ID, cancelCtx.ChangefeedVars().Info)

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
Expand Down Expand Up @@ -317,7 +316,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
// We don't need to wait sink Close, pass a canceled context is ok
if err := c.sink.Close(canceledCtx); err != nil {
if err := c.sink.close(canceledCtx); err != nil {
log.Warn("Closing sink failed in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err))
}
c.wg.Wait()
Expand Down Expand Up @@ -449,7 +448,7 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
return barrierTs, nil
}
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.SyncPointInterval))
if err := c.sink.SinkSyncpoint(ctx, barrierTs); err != nil {
if err := c.sink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
}
c.barriers.Update(syncPointBarrier, nextSyncPointTs)
Expand Down Expand Up @@ -496,7 +495,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
log.Warn("ignore the DDL job of ineligible table", zap.Reflect("job", job))
return true, nil
}
done, err = c.sink.EmitDDLEvent(ctx, c.ddlEventCache)
done, err = c.sink.emitDDLEvent(ctx, c.ddlEventCache)
if err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit 022b4f1

Please sign in to comment.