Skip to content

Commit

Permalink
Merge branch 'release-4.0' into cherry-pick-3887-to-release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus authored Dec 20, 2021
2 parents 9d96276 + 784e7d0 commit 4f9a877
Show file tree
Hide file tree
Showing 338 changed files with 1,112 additions and 1,410 deletions.
13 changes: 13 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,16 @@ linters:
enable:
- unconvert
- unparam
- depguard

linters-settings:
depguard:
list-type: blacklist
include-go-root: false
packages:
- log
- github.com/juju/errors
packages-with-error-message:
# specify an error message to output when a blacklisted package is used
- log: "logging is allowed only by pingcap/log"
- github.com/juju/errors: "error handling is allowed only by pingcap/errors"
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
FROM golang:1.14-alpine as builder
RUN apk add --no-cache git make bash
WORKDIR /go/src/github.com/pingcap/ticdc
WORKDIR /go/src/github.com/pingcap/tiflow
COPY . .
ENV CDC_ENABLE_VENDOR=0
RUN make

FROM alpine:3.12
RUN apk add --no-cache tzdata bash curl socat
COPY --from=builder /go/src/github.com/pingcap/ticdc/bin/cdc /cdc
COPY --from=builder /go/src/github.com/pingcap/tiflow/bin/cdc /cdc
EXPOSE 8300
CMD [ "/cdc" ]
4 changes: 2 additions & 2 deletions Dockerfile.development
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM golang:1.14-alpine as builder
RUN apk add --no-cache git make bash
WORKDIR /go/src/github.com/pingcap/ticdc
WORKDIR /go/src/github.com/pingcap/tiflow
COPY . .
ENV CDC_ENABLE_VENDOR=1
RUN go mod vendor
Expand All @@ -10,6 +10,6 @@ RUN make failpoint-disable

FROM alpine:3.12
RUN apk add --no-cache tzdata bash curl socat
COPY --from=builder /go/src/github.com/pingcap/ticdc/bin/cdc /cdc
COPY --from=builder /go/src/github.com/pingcap/tiflow/bin/cdc /cdc
EXPOSE 8300
CMD [ "/cdc" ]
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
.PHONY: build test check clean fmt cdc kafka_consumer coverage \
integration_test_build integration_test integration_test_mysql integration_test_kafka

PROJECT=ticdc
PROJECT=tiflow

FAIL_ON_STDOUT := awk '{ print } END { if (NR > 0) { exit 1 } }'

Expand Down Expand Up @@ -34,7 +34,7 @@ PACKAGES := $$($(PACKAGE_LIST))
PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||'
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor|kv_gen|proto')
TEST_FILES := $$(find . -name '*_test.go' -type f | grep -vE 'vendor|kv_gen|integration|testing_utils')
CDC_PKG := github.com/pingcap/ticdc
CDC_PKG := github.com/pingcap/tiflow
FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/pingcap/$(PROJECT)/"}|grep -v "github.com/pingcap/$(PROJECT)"; done)
FAILPOINT := bin/failpoint-ctl

Expand Down Expand Up @@ -125,8 +125,8 @@ integration_test_build: check_failpoint_ctl
./scripts/fix_lib_zstd.sh
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/pingcap/ticdc/... \
-o bin/cdc.test github.com/pingcap/ticdc/cmd/cdc \
-coverpkg=github.com/pingcap/tiflow/... \
-o bin/cdc.test github.com/pingcap/tiflow/cmd/cdc \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc/main.go \
|| { $(FAILPOINT_DISABLE); exit 1; }
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# TiCDC

[![Build Status](https://internal.pingcap.net/idc-jenkins/job/build_cdc_master/badge/icon)](https://internal.pingcap.net/idc-jenkins/job/build_cdc_master/)
[![codecov](https://codecov.io/gh/pingcap/ticdc/branch/master/graph/badge.svg)](https://codecov.io/gh/pingcap/ticdc)
[![Coverage Status](https://coveralls.io/repos/github/pingcap/ticdc/badge.svg)](https://coveralls.io/github/pingcap/ticdc)
[![LICENSE](https://img.shields.io/github/license/pingcap/ticdc.svg)](https://github.com/pingcap/ticdc/blob/master/LICENSE)
[![Go Report Card](https://goreportcard.com/badge/github.com/pingcap/ticdc)](https://goreportcard.com/report/github.com/pingcap/ticdc)
[![codecov](https://codecov.io/gh/pingcap/tiflow/branch/master/graph/badge.svg)](https://codecov.io/gh/pingcap/tiflow)
[![Coverage Status](https://coveralls.io/repos/github/pingcap/tiflow/badge.svg)](https://coveralls.io/github/pingcap/tiflow)
[![LICENSE](https://img.shields.io/github/license/pingcap/tiflow.svg)](https://github.com/pingcap/tiflow/blob/master/LICENSE)
[![Go Report Card](https://goreportcard.com/badge/github.com/pingcap/tiflow)](https://goreportcard.com/report/github.com/pingcap/tiflow)

**TiCDC** is [TiDB](https://docs.pingcap.com/tidb/stable)'s change data capture framework. It supports replicating change data to various downstreams, including MySQL protocol-compatible databases, message queues via the open CDC protocol and other systems such as local file storage.

Expand Down
10 changes: 5 additions & 5 deletions cdc/async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"go.uber.org/zap"
)

Expand Down
14 changes: 7 additions & 7 deletions cdc/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/util/testleak"
)

var _ = check.Suite(&asyncSinkSuite{})
Expand Down
18 changes: 9 additions & 9 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/processor"
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
Expand Down
20 changes: 10 additions & 10 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/owner"
"github.com/pingcap/ticdc/cdc/processor"
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/processor"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/version"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
Expand Down
18 changes: 9 additions & 9 deletions cdc/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/util/testleak"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
Expand Down Expand Up @@ -113,10 +113,10 @@ func (s *captureSuite) TestCaptureSessionDoneDuringHandleTask(c *check.C) {
c.Assert(err, check.IsNil)

runProcessorCount := 0
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/captureHandleTaskDelay", "sleep(500)")
err = failpoint.Enable("github.com/pingcap/tiflow/cdc/captureHandleTaskDelay", "sleep(500)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/captureHandleTaskDelay")
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/captureHandleTaskDelay")
}()
runProcessorBackup := runProcessorImpl
runProcessorImpl = func(
Expand Down
18 changes: 9 additions & 9 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/cyclic/mark"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/scheduler"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink"
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/scheduler"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand Down
10 changes: 5 additions & 5 deletions cdc/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (

"github.com/pingcap/check"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/util/testleak"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/embed"
Expand Down
4 changes: 2 additions & 2 deletions cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"time"

"github.com/pingcap/parser/mysql"
"github.com/pingcap/ticdc/cdc/model"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"testing"

"github.com/pingcap/check"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tiflow/pkg/util/testleak"
)

func Test(t *testing.T) { check.TestingT(t) }
Expand Down
77 changes: 36 additions & 41 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/ticdc/cdc/model"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -310,21 +310,20 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
colName := colInfo.Name.O
colDatums, exist := datums[colInfo.ID]
var colValue interface{}
if !exist && !fillWithDefaultValue {
continue
}
var err error
var warn string
if exist {
var err error
var warn string
colValue, warn, err = formatColVal(colDatums, colInfo.Tp)
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
} else if fillWithDefaultValue {
colValue, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
colValue = getDefaultOrZeroValue(colInfo)
} else {
continue
}
cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{
Name: colName,
Expand Down Expand Up @@ -442,39 +441,35 @@ func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, e
}
return v, warn, nil
default:
// FIXME: GetValue() may return some types that go sql not support, which will cause sink DML fail
// Make specified convert upper if you need
// Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
return datum.GetValue(), "", nil
}
}

// getDefaultOrZeroValue return interface{} need to meet to require type in
// https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
// Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support
func getDefaultOrZeroValue(col *timodel.ColumnInfo) (value interface{}, warn string, err error) {
var d types.Datum
func getDefaultOrZeroValue(col *timodel.ColumnInfo) interface{} {
// see https://github.com/pingcap/tidb/issues/9304
// must use null if TiDB not write the column value when default value is null
// and the value is null
if !mysql.HasNotNullFlag(col.Flag) {
// see https://github.com/pingcap/tidb/issues/9304
// must use null if TiDB not write the column value when default value is null
// and the value is null
d = types.NewDatum(nil)
} else if col.GetDefaultValue() != nil {
d = types.NewDatum(col.GetDefaultValue())
} else {
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d = types.NewDatum(col.FieldType.Elems[0])
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes, "", nil
default:
d = table.GetZeroValue(col)
}
d := types.NewDatum(nil)
return d.GetValue()
}

if col.GetDefaultValue() != nil {
d := types.NewDatum(col.GetDefaultValue())
return d.GetValue()
}
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d := types.NewDatum(col.FieldType.Elems[0])
return d.GetValue()
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes
}

return formatColVal(d, col.Tp)
d := table.GetZeroValue(col)
return d.GetValue()
}

func fetchHandleValue(tableInfo *model.TableInfo, recordID int64) (pkCoID int64, pkValue *types.Datum, err error) {
Expand Down
Loading

0 comments on commit 4f9a877

Please sign in to comment.