Skip to content

Commit

Permalink
sink(ticdc): cleanup expired files by day for storage sink (pingcap#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 27, 2023
1 parent 587b155 commit c2c775d
Show file tree
Hide file tree
Showing 21 changed files with 874 additions and 53 deletions.
62 changes: 49 additions & 13 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,18 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
LargeMessageHandle: largeMessageHandle,
}
}

if c.Sink.CloudStorageConfig != nil {
res.Sink.CloudStorageConfig = &config.CloudStorageConfig{
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
}
}
}
if c.Mounter != nil {
res.Mounter = &config.MounterConfig{
Expand Down Expand Up @@ -465,6 +477,18 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
LargeMessageHandle: largeMessageHandle,
}
}

if cloned.Sink.CloudStorageConfig != nil {
res.Sink.CloudStorageConfig = &CloudStorageConfig{
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
}
}
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Expand Down Expand Up @@ -586,19 +610,20 @@ type Table struct {
// SinkConfig represents sink config for a changefeed
// This is a duplicate of config.SinkConfig
type SinkConfig struct {
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
DispatchRules []*DispatchRule `json:"dispatchers,omitempty"`
ColumnSelectors []*ColumnSelector `json:"column_selectors"`
TxnAtomicity string `json:"transaction_atomicity"`
EncoderConcurrency int `json:"encoder_concurrency"`
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
FileIndexWidth int `json:"file_index_width"`
KafkaConfig *KafkaConfig `json:"kafka_config"`
AdvanceTimeoutInSec uint `json:"advance_timeout,omitempty"`
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
DispatchRules []*DispatchRule `json:"dispatchers,omitempty"`
ColumnSelectors []*ColumnSelector `json:"column_selectors"`
TxnAtomicity string `json:"transaction_atomicity"`
EncoderConcurrency int `json:"encoder_concurrency"`
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
FileIndexWidth int `json:"file_index_width"`
KafkaConfig *KafkaConfig `json:"kafka_config"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
AdvanceTimeoutInSec uint `json:"advance_timeout,omitempty"`
}

// KafkaConfig represents kafka config for a changefeed.
Expand All @@ -615,6 +640,17 @@ type KafkaConfig struct {
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
}

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
}

// CSVConfig denotes the csv config
// This is the same as config.CSVConfig
type CSVConfig struct {
Expand Down
8 changes: 5 additions & 3 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,14 @@ func TestGCAndCleanup(t *testing.T) {
cancel()
require.ErrorIs(t, eg.Wait(), context.Canceled)

m.Cleanup(ctx)
ret, err := extStorage.FileExists(ctx, getDeletedChangefeedMarker(changefeedID))
clenupCtx, clenupCancel := context.WithCancel(context.Background())
defer clenupCancel()
m.Cleanup(clenupCtx)
ret, err := extStorage.FileExists(clenupCtx, getDeletedChangefeedMarker(changefeedID))
require.NoError(t, err)
require.True(t, ret)
cnt := 0
extStorage.WalkDir(ctx, nil, func(path string, size int64) error {
extStorage.WalkDir(clenupCtx, nil, func(path string, size int64) error {
cnt++
return nil
})
Expand Down
166 changes: 157 additions & 9 deletions cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,88 @@ import (
"context"
"encoding/json"
"net/url"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/util"
"github.com/robfig/cron"
"go.uber.org/zap"
)

// Assert DDLEventSink implementation
var _ ddlsink.DDLEventSink = (*ddlSink)(nil)
var _ ddlsink.DDLEventSink = (*DDLSink)(nil)

type ddlSink struct {
// DDLSink is a sink that writes ddl events to cloud storage.
type DDLSink struct {
// id indicates which changefeed this sink belongs to.
id model.ChangeFeedID
// statistic is used to record the DDL metrics
statistics *metrics.Statistics
storage storage.ExternalStorage
cfg *cloudstorage.Config
cron *cron.Cron

lastCheckpointTs atomic.Uint64
lastSendCheckpointTsTime time.Time
}

// NewCloudStorageDDLSink creates a ddl sink for cloud storage.
func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, error) {
// NewDDLSink creates a ddl sink for cloud storage.
func NewDDLSink(ctx context.Context,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
) (*DDLSink, error) {
return newDDLSink(ctx, sinkURI, replicaConfig, nil)
}

func newDDLSink(ctx context.Context,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
cleanupJobs []func(), /* only for test */
) (*DDLSink, error) {
// create cloud storage config and then apply the params of sinkURI to it.
cfg := cloudstorage.NewConfig()
err := cfg.Apply(ctx, sinkURI, replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}

storage, err := util.GetExternalStorageFromURI(ctx, sinkURI.String())
if err != nil {
return nil, err
}

changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
d := &ddlSink{
d := &DDLSink{
id: changefeedID,
storage: storage,
statistics: metrics.NewStatistics(ctx, sink.TxnSink),
cfg: cfg,
lastSendCheckpointTsTime: time.Now(),
}

if err := d.initCron(ctx, sinkURI, cleanupJobs); err != nil {
return nil, errors.Trace(err)
}
// Note: It is intended to run the cleanup goroutine in the background.
// we don't wait for it to finish since the gourotine would be stuck if
// the downstream is abnormal, especially when the downstream is a nfs.
go d.bgCleanup(ctx)
return d, nil
}

// WriteDDLEvent writes the ddl event to the cloud storage.
func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
writeFile := func(def cloudstorage.TableDefinition) error {
encodedDef, err := def.MarshalWithQuery()
if err != nil {
Expand Down Expand Up @@ -103,7 +136,8 @@ func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
return nil
}

func (d *ddlSink) WriteCheckpointTs(ctx context.Context,
// WriteCheckpointTs writes a checkpoint timestamp to the sink.
func (d *DDLSink) WriteCheckpointTs(ctx context.Context,
ts uint64, tables []*model.TableInfo,
) error {
if time.Since(d.lastSendCheckpointTsTime) < 2*time.Second {
Expand All @@ -115,6 +149,7 @@ func (d *ddlSink) WriteCheckpointTs(ctx context.Context,

defer func() {
d.lastSendCheckpointTsTime = time.Now()
d.lastCheckpointTs.Store(ts)
}()
ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts})
if err != nil {
Expand All @@ -124,7 +159,120 @@ func (d *ddlSink) WriteCheckpointTs(ctx context.Context,
return errors.Trace(err)
}

func (d *ddlSink) Close() error {
func (d *DDLSink) initCron(
ctx context.Context, sinkURI *url.URL, cleanupJobs []func(),
) (err error) {
if cleanupJobs == nil {
cleanupJobs = d.genCleanupJob(ctx, sinkURI)
}

d.cron = cron.New()
for _, job := range cleanupJobs {
err = d.cron.AddFunc(d.cfg.FileCleanupCronSpec, job)
if err != nil {
return err
}
}
return nil
}

func (d *DDLSink) bgCleanup(ctx context.Context) {
if d.cfg.DateSeparator != config.DateSeparatorDay.String() || d.cfg.FileExpirationDays <= 0 {
log.Info("skip cleanup expired files for storage sink",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.String("dateSeparator", d.cfg.DateSeparator),
zap.Int("expiredFileTTL", d.cfg.FileExpirationDays))
return
}

d.cron.Start()
defer d.cron.Stop()
log.Info("start schedule cleanup expired files for storage sink",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.String("dateSeparator", d.cfg.DateSeparator),
zap.Int("expiredFileTTL", d.cfg.FileExpirationDays))

// wait for the context done
<-ctx.Done()
log.Info("stop schedule cleanup expired files for storage sink",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Error(ctx.Err()))
}

func (d *DDLSink) genCleanupJob(ctx context.Context, uri *url.URL) []func() {
ret := []func(){}

isLocal := uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == ""
isRemoveEmptyDirsRuning := atomic.Bool{}
if isLocal {
ret = append(ret, func() {
if !isRemoveEmptyDirsRuning.CompareAndSwap(false, true) {
log.Warn("remove empty dirs is already running, skip this round",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID))
return
}

checkpointTs := d.lastCheckpointTs.Load()
start := time.Now()
cnt, err := cloudstorage.RemoveEmptyDirs(ctx, d.id, uri.Path)
if err != nil {
log.Error("failed to remove empty dirs",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Duration("cost", time.Since(start)),
zap.Error(err),
)
return
}
log.Info("remove empty dirs",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("count", cnt),
zap.Duration("cost", time.Since(start)))
})
}

isCleanupRunning := atomic.Bool{}
ret = append(ret, func() {
if !isCleanupRunning.CompareAndSwap(false, true) {
log.Warn("cleanup expired files is already running, skip this round",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID))
return
}

defer isCleanupRunning.Store(false)
start := time.Now()
checkpointTs := d.lastCheckpointTs.Load()
cnt, err := cloudstorage.RemoveExpiredFiles(ctx, d.id, d.storage, d.cfg, checkpointTs)
if err != nil {
log.Error("failed to remove expired files",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Duration("cost", time.Since(start)),
zap.Error(err),
)
return
}
log.Info("remove expired files",
zap.String("namespace", d.id.Namespace),
zap.String("changefeedID", d.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("count", cnt),
zap.Duration("cost", time.Since(start)))
})
return ret
}

// Close closes the sink.
func (d *DDLSink) Close() error {
if d.statistics != nil {
d.statistics.Close()
}
Expand Down
Loading

0 comments on commit c2c775d

Please sign in to comment.