Skip to content

Commit

Permalink
sink(ticdc): change the directory of storage sink only when ddl event…
Browse files Browse the repository at this point in the history
… occurs (#8881) (#8921)

close #8890, close #8891
  • Loading branch information
ti-chi-bot authored May 17, 2023
1 parent d703cfe commit c8fe772
Show file tree
Hide file tree
Showing 31 changed files with 1,085 additions and 500 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ build_mysql_integration_test_images: clean_integration_test_containers
integration_test_kafka: check_third_party_binary
tests/integration_tests/run.sh kafka "$(CASE)" "$(START_AT)"

integration_test_storage:
tests/integration_tests/run.sh storage "$(CASE)" "$(START_AT)"

kafka_docker_integration_test: ## Run TiCDC Kafka all integration tests in Docker.
kafka_docker_integration_test: clean_integration_test_containers
docker-compose -f $(TICDC_DOCKER_DEPLOYMENTS_DIR)/docker-compose-kafka-integration.yml up
Expand Down
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
Terminator: c.Sink.Terminator,
DateSeparator: c.Sink.DateSeparator,
EnablePartitionSeparator: c.Sink.EnablePartitionSeparator,
FileIndexWidth: c.Sink.FileIndexWidth,
}
}
if c.Mounter != nil {
Expand Down Expand Up @@ -383,6 +384,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
Terminator: cloned.Sink.Terminator,
DateSeparator: cloned.Sink.DateSeparator,
EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator,
FileIndexWidth: cloned.Sink.FileIndexWidth,
}
}
if cloned.Consistent != nil {
Expand Down Expand Up @@ -512,6 +514,7 @@ type SinkConfig struct {
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
FileIndexWidth int `json:"file_index_width"`
}

// CSVConfig denotes the csv config
Expand Down
6 changes: 4 additions & 2 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,12 @@ func (s *schemaWrap4Owner) BuildDDLEvents(
if job.BinlogInfo != nil && job.BinlogInfo.TableInfo != nil {
tableInfo = model.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.FinishedTS, job.BinlogInfo.TableInfo)
} else {
// for an invalid DDL job or a DDL job that does not contain TableInfo,
// just retrieve the schema name.
// Just retrieve the schema name for a DDL job that does not contain TableInfo.
// Currently supported by cdc are: ActionCreateSchema, ActionDropSchema,
// and ActionModifySchemaCharsetAndCollate.
tableInfo = &model.TableInfo{
TableName: model.TableName{Schema: job.SchemaName},
Version: job.BinlogInfo.FinishedTS,
}
}
event.FromJob(job, preTableInfo, tableInfo)
Expand Down
22 changes: 10 additions & 12 deletions cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package cloudstorage
import (
"context"
"encoding/json"
"fmt"
"net/url"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

// Assert DDLEventSink implementation
Expand Down Expand Up @@ -58,24 +59,21 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er
return d, nil
}

func generateSchemaPath(def cloudstorage.TableDefinition) string {
return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion)
}

// WriteDDLEvent writes the ddl event to the cloud storage.
func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
var def cloudstorage.TableDefinition

if ddl.TableInfo.TableInfo == nil {
return nil
}

def.FromDDLEvent(ddl)
encodedDef, err := json.MarshalIndent(def, "", " ")
encodedDef, err := def.MarshalWithQuery()
if err != nil {
return errors.Trace(err)
}

path := generateSchemaPath(def)
path, err := def.GenerateSchemaFilePath()
if err != nil {
return errors.Trace(err)
}
log.Debug("write ddl event to external storage",
zap.String("path", path), zap.Any("ddl", ddl))
err = d.statistics.RecordDDLExecution(func() error {
err1 := d.storage.WriteFile(ctx, path, encodedDef)
if err1 != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,11 @@ func TestWriteDDLEvent(t *testing.T) {
},
},
}
tableDir := path.Join(parentDir, "test/table1/100")
os.MkdirAll(tableDir, 0o755)
tableDir := path.Join(parentDir, "test/table1/meta/")
err = sink.WriteDDLEvent(ctx, ddlEvent)
require.Nil(t, err)

tableSchema, err := os.ReadFile(path.Join(tableDir, "schema.json"))
tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json"))
require.Nil(t, err)
require.JSONEq(t, `{
"Table": "table1",
Expand Down Expand Up @@ -124,8 +123,6 @@ func TestWriteCheckpointTs(t *testing.T) {
},
},
}
table1Dir := path.Join(parentDir, "test/table1/100")
os.MkdirAll(table1Dir, 0o755)

err = sink.WriteCheckpointTs(ctx, 100, tables)
require.Nil(t, err)
Expand Down
146 changes: 72 additions & 74 deletions cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
dmlsink "github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
Expand All @@ -39,16 +40,34 @@ func setClock(s *dmlSink, clock clock.Clock) {
}
}

func getTableFiles(t *testing.T, tableDir string) []string {
files, err := os.ReadDir(tableDir)
require.Nil(t, err)

fileNames := []string{}
for _, f := range files {
fileName := f.Name()
if f.IsDir() {
metaFiles, err := os.ReadDir(path.Join(tableDir, f.Name()))
require.Nil(t, err)
require.Len(t, metaFiles, 1)
fileName = metaFiles[0].Name()
}
fileNames = append(fileNames, fileName)
}
return fileNames
}

func generateTxnEvents(
cnt *uint64,
batch int,
tableStatus *state.TableSinkState,
) []*eventsink.TxnCallbackableEvent {
) []*dmlsink.TxnCallbackableEvent {
// assume we have a large transaction and it is splitted into 10 small transactions
txns := make([]*eventsink.TxnCallbackableEvent, 0, 10)
txns := make([]*dmlsink.TxnCallbackableEvent, 0, 10)

for i := 0; i < 10; i++ {
txn := &eventsink.TxnCallbackableEvent{
txn := &dmlsink.TxnCallbackableEvent{
Event: &model.SingleTableTxn{
CommitTs: 100,
Table: &model.TableName{Schema: "test", Table: "table1"},
Expand Down Expand Up @@ -80,6 +99,10 @@ func generateTxnEvents(
{Name: "c1", Value: i*batch + j},
{Name: "c2", Value: "hello world"},
},
ColInfos: []rowcodec.ColInfo{
{ID: 1, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 2, Ft: types.NewFieldType(mysql.TypeVarchar)},
},
}
txn.Event.Rows = append(txn.Event.Rows, row)
}
Expand All @@ -99,8 +122,8 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
require.Nil(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = config.ProtocolOpen.String()

replicaConfig.Sink.Protocol = config.ProtocolCsv.String()
replicaConfig.Sink.FileIndexWidth = 6
errCh := make(chan error, 5)
s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
Expand All @@ -110,51 +133,45 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {

// generating one dml file.
txns := generateTxnEvents(&cnt, batch, &tableStatus)
tableDir := path.Join(parentDir, "test/table1/33")
err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)

files, err := os.ReadDir(tableDir)
metaDir := path.Join(parentDir, "test/table1/meta")
files, err := os.ReadDir(metaDir)
require.Nil(t, err)
require.Len(t, files, 3)
var fileNames []string
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
require.ElementsMatch(t, []string{"CDC000001.json", "schema.json", "CDC.index"}, fileNames)
content, err := os.ReadFile(path.Join(tableDir, "CDC000001.json"))
require.Len(t, files, 1)

tableDir := path.Join(parentDir, "test/table1/33")
fileNames := getTableFiles(t, tableDir)
require.Len(t, fileNames, 2)
require.ElementsMatch(t, []string{"CDC000001.csv", "CDC.index"}, fileNames)
content, err := os.ReadFile(path.Join(tableDir, "CDC000001.csv"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000001.json\n", string(content))
require.Equal(t, "CDC000001.csv\n", string(content))
require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt))

// generating another dml file.
err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)

files, err = os.ReadDir(tableDir)
require.Nil(t, err)
require.Len(t, files, 4)
fileNames = nil
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
fileNames = getTableFiles(t, tableDir)
require.Len(t, fileNames, 3)
require.ElementsMatch(t, []string{
"CDC000001.json", "CDC000002.json",
"schema.json", "CDC.index",
"CDC000001.csv", "CDC000002.csv", "CDC.index",
}, fileNames)
content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json"))
content, err = os.ReadFile(path.Join(tableDir, "CDC000002.csv"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000002.json\n", string(content))
require.Equal(t, "CDC000002.csv\n", string(content))
require.Equal(t, uint64(2000), atomic.LoadUint64(&cnt))

cancel()
Expand All @@ -171,8 +188,9 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
require.Nil(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = config.ProtocolOpen.String()
replicaConfig.Sink.Protocol = config.ProtocolCsv.String()
replicaConfig.Sink.DateSeparator = config.DateSeparatorDay.String()
replicaConfig.Sink.FileIndexWidth = 6

errCh := make(chan error, 5)
s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
Expand All @@ -191,21 +209,16 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
require.Nil(t, err)
time.Sleep(3 * time.Second)

files, err := os.ReadDir(tableDir)
require.Nil(t, err)
require.Len(t, files, 2)
var fileNames []string
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
require.ElementsMatch(t, []string{"CDC000001.json", "CDC.index"}, fileNames)
content, err := os.ReadFile(path.Join(tableDir, "CDC000001.json"))
fileNames := getTableFiles(t, tableDir)
require.Len(t, fileNames, 2)
require.ElementsMatch(t, []string{"CDC000001.csv", "CDC.index"}, fileNames)
content, err := os.ReadFile(path.Join(tableDir, "CDC000001.csv"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000001.json\n", string(content))
require.Equal(t, "CDC000001.csv\n", string(content))
require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt))

// test date (day) is NOT changed.
Expand All @@ -216,21 +229,16 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
require.Nil(t, err)
time.Sleep(3 * time.Second)

files, err = os.ReadDir(tableDir)
require.Nil(t, err)
require.Len(t, files, 3)
fileNames = nil
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
require.ElementsMatch(t, []string{"CDC000001.json", "CDC000002.json", "CDC.index"}, fileNames)
content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json"))
fileNames = getTableFiles(t, tableDir)
require.Len(t, fileNames, 3)
require.ElementsMatch(t, []string{"CDC000001.csv", "CDC000002.csv", "CDC.index"}, fileNames)
content, err = os.ReadFile(path.Join(tableDir, "CDC000002.csv"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000002.json\n", string(content))
require.Equal(t, "CDC000002.csv\n", string(content))
require.Equal(t, uint64(2000), atomic.LoadUint64(&cnt))

// test date (day) is changed.
Expand All @@ -242,21 +250,16 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
time.Sleep(3 * time.Second)

tableDir = path.Join(parentDir, "test/table1/33/2023-03-09")
files, err = os.ReadDir(tableDir)
require.Nil(t, err)
require.Len(t, files, 2)
fileNames = nil
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
require.ElementsMatch(t, []string{"CDC000001.json", "CDC.index"}, fileNames)
content, err = os.ReadFile(path.Join(tableDir, "CDC000001.json"))
fileNames = getTableFiles(t, tableDir)
require.Len(t, fileNames, 2)
require.ElementsMatch(t, []string{"CDC000001.csv", "CDC.index"}, fileNames)
content, err = os.ReadFile(path.Join(tableDir, "CDC000001.csv"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000001.json\n", string(content))
require.Equal(t, "CDC000001.csv\n", string(content))
require.Equal(t, uint64(3000), atomic.LoadUint64(&cnt))
cancel()
s.Close()
Expand All @@ -274,21 +277,16 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
require.Nil(t, err)
time.Sleep(3 * time.Second)

files, err = os.ReadDir(tableDir)
require.Nil(t, err)
require.Len(t, files, 3)
fileNames = nil
for _, f := range files {
fileNames = append(fileNames, f.Name())
}
require.ElementsMatch(t, []string{"CDC000001.json", "CDC000002.json", "CDC.index"}, fileNames)
content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json"))
fileNames = getTableFiles(t, tableDir)
require.Len(t, fileNames, 3)
require.ElementsMatch(t, []string{"CDC000001.csv", "CDC000002.csv", "CDC.index"}, fileNames)
content, err = os.ReadFile(path.Join(tableDir, "CDC000002.csv"))
require.Nil(t, err)
require.Greater(t, len(content), 0)

content, err = os.ReadFile(path.Join(tableDir, "CDC.index"))
content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index"))
require.Nil(t, err)
require.Equal(t, "CDC000002.json\n", string(content))
require.Equal(t, "CDC000002.csv\n", string(content))
require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt))

cancel()
Expand Down
Loading

0 comments on commit c8fe772

Please sign in to comment.