diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 7e77c9df2a05b..a2a6a743efeb2 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -170,7 +170,7 @@ func MakeTableRegions( go func() { defer wg.Done() for info := range fileChan { - regions, sizes, err := makeSourceFileRegion(execCtx, meta, info, columns, cfg, ioWorkers, store) + regions, sizes, err := MakeSourceFileRegion(execCtx, meta, info, columns, cfg, ioWorkers, store) select { case resultChan <- fileRegionRes{info: info, regions: regions, sizes: sizes, err: err}: case <-ctx.Done(): @@ -255,7 +255,8 @@ func MakeTableRegions( return filesRegions, nil } -func makeSourceFileRegion( +// MakeSourceFileRegion create a new source file region. +func MakeSourceFileRegion( ctx context.Context, meta *MDTableMeta, fi FileInfo, @@ -283,7 +284,9 @@ func makeSourceFileRegion( // We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools // like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can // avoid split a lot of small chunks. - if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) { + // If a csv file is compressed, we can't split it now because we can't get the exact size of a row. + if isCsvFile && cfg.Mydumper.StrictFormat && fi.FileMeta.Compression == CompressionNone && + dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) { _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store) return regions, subFileSizes, err } diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index 5c4bc1c7734b5..0830d378f47ff 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -164,6 +164,63 @@ func TestAllocateEngineIDs(t *testing.T) { }) } +func TestMakeSourceFileRegion(t *testing.T) { + meta := &MDTableMeta{ + DB: "csv", + Name: "large_csv_file", + } + cfg := &config.Config{ + Mydumper: config.MydumperRuntime{ + ReadBlockSize: config.ReadBlockSize, + MaxRegionSize: 1, + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: "", + Header: true, + TrimLastSep: false, + NotNull: false, + Null: "NULL", + BackslashEscape: true, + }, + StrictFormat: true, + Filter: []string{"*.*"}, + }, + } + filePath := "./csv/split_large_file.csv" + dataFileInfo, err := os.Stat(filePath) + require.NoError(t, err) + fileSize := dataFileInfo.Size() + fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: filePath, Type: SourceTypeCSV, FileSize: fileSize}} + colCnt := 3 + columns := []string{"a", "b", "c"} + + ctx := context.Background() + ioWorkers := worker.NewPool(ctx, 4, "io") + store, err := storage.NewLocalStorage(".") + assert.NoError(t, err) + + // test - no compression + fileInfo.FileMeta.Compression = CompressionNone + regions, _, err := MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store) + assert.NoError(t, err) + offsets := [][]int64{{6, 12}, {12, 18}, {18, 24}, {24, 30}} + assert.Len(t, regions, len(offsets)) + for i := range offsets { + assert.Equal(t, offsets[i][0], regions[i].Chunk.Offset) + assert.Equal(t, offsets[i][1], regions[i].Chunk.EndOffset) + assert.Equal(t, columns, regions[i].Chunk.Columns) + } + + // test - gzip compression + fileInfo.FileMeta.Compression = CompressionGZ + regions, _, err = MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store) + assert.NoError(t, err) + assert.Len(t, regions, 1) + assert.Equal(t, int64(0), regions[0].Chunk.Offset) + assert.Equal(t, fileInfo.FileMeta.FileSize, regions[0].Chunk.EndOffset) + assert.Len(t, regions[0].Chunk.Columns, 0) +} + func TestSplitLargeFile(t *testing.T) { meta := &MDTableMeta{ DB: "csv", diff --git a/br/pkg/lightning/mydump/router.go b/br/pkg/lightning/mydump/router.go index 75a9c61a98553..5b6263610fdf6 100644 --- a/br/pkg/lightning/mydump/router.go +++ b/br/pkg/lightning/mydump/router.go @@ -65,6 +65,10 @@ const ( CompressionZStd // CompressionXZ is the compression type that uses XZ algorithm. CompressionXZ + // CompressionLZO is the compression type that uses LZO algorithm. + CompressionLZO + // CompressionSnappy is the compression type that uses Snappy algorithm. + CompressionSnappy ) func parseSourceType(t string) (SourceType, error) { @@ -109,7 +113,7 @@ func (s SourceType) String() string { func parseCompressionType(t string) (Compression, error) { switch strings.ToLower(strings.TrimSpace(t)) { - case "gz": + case "gz", "gzip": return CompressionGZ, nil case "lz4": return CompressionLZ4, nil @@ -117,6 +121,10 @@ func parseCompressionType(t string) (Compression, error) { return CompressionZStd, nil case "xz": return CompressionXZ, nil + case "lzo": + return CompressionLZO, nil + case "snappy": + return CompressionSnappy, nil case "": return CompressionNone, nil default: @@ -128,15 +136,15 @@ var expandVariablePattern = regexp.MustCompile(`\$(?:\$|[\pL\p{Nd}_]+|\{[\pL\p{N var defaultFileRouteRules = []*config.FileRouteRule{ // ignore *-schema-trigger.sql, *-schema-post.sql files - {Pattern: `(?i).*(-schema-trigger|-schema-post)\.sql$`, Type: "ignore"}, - // db schema create file pattern, matches files like '{schema}-schema-create.sql' - {Pattern: `(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql$`, Schema: "$1", Table: "", Type: SchemaSchema, Unescape: true}, - // table schema create file pattern, matches files like '{schema}.{table}-schema.sql' - {Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql$`, Schema: "$1", Table: "$2", Type: TableSchema, Unescape: true}, - // view schema create file pattern, matches files like '{schema}.{table}-schema-view.sql' - {Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema-view\.sql$`, Schema: "$1", Table: "$2", Type: ViewSchema, Unescape: true}, - // source file pattern, matches files like '{schema}.{table}.0001.{sql|csv}' - {Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)$`, Schema: "$1", Table: "$2", Type: "$4", Key: "$3", Unescape: true}, + {Pattern: `(?i).*(-schema-trigger|-schema-post)\.sql(?:\.(\w*?))?$`, Type: "ignore"}, + // db schema create file pattern, matches files like '{schema}-schema-create.sql[.{compress}]' + {Pattern: `(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql(?:\.(\w*?))?$`, Schema: "$1", Table: "", Type: SchemaSchema, Compression: "$2", Unescape: true}, + // table schema create file pattern, matches files like '{schema}.{table}-schema.sql[.{compress}]' + {Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql(?:\.(\w*?))?$`, Schema: "$1", Table: "$2", Type: TableSchema, Compression: "$3", Unescape: true}, + // view schema create file pattern, matches files like '{schema}.{table}-schema-view.sql[.{compress}]' + {Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema-view\.sql(?:\.(\w*?))?$`, Schema: "$1", Table: "$2", Type: ViewSchema, Compression: "$3", Unescape: true}, + // source file pattern, matches files like '{schema}.{table}.0001.{sql|csv}[.{compress}]' + {Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)(?:\.(\w+))?$`, Schema: "$1", Table: "$2", Type: "$4", Key: "$3", Compression: "$5", Unescape: true}, } // FileRouter provides some operations to apply a rule to route file path to target schema/table @@ -292,9 +300,6 @@ func (p regexRouterParser) Parse(r *config.FileRouteRule, logger log.Logger) (*R if err != nil { return err } - if compression != CompressionNone { - return errors.New("Currently we don't support restore compressed source file yet") - } result.Compression = compression return nil }) diff --git a/br/pkg/lightning/mydump/router_test.go b/br/pkg/lightning/mydump/router_test.go index 7401027cfbd36..4f57c0c541364 100644 --- a/br/pkg/lightning/mydump/router_test.go +++ b/br/pkg/lightning/mydump/router_test.go @@ -38,6 +38,34 @@ func TestRouteParser(t *testing.T) { } } +func TestDefaultRouter(t *testing.T) { + r, err := NewFileRouter(defaultFileRouteRules, log.L()) + assert.NoError(t, err) + + inputOutputMap := map[string][]string{ + "a/test-schema-create.sql": {"test", "", "", "", SchemaSchema}, + "test-schema-create.sql.gz": {"test", "", "", "gz", SchemaSchema}, + "c/d/test.t-schema.sql": {"test", "t", "", "", TableSchema}, + "test.t-schema.sql.lzo": {"test", "t", "", "lzo", TableSchema}, + "/bc/dc/test.v1-schema-view.sql": {"test", "v1", "", "", ViewSchema}, + "test.v1-schema-view.sql.snappy": {"test", "v1", "", "snappy", ViewSchema}, + "my_schema.my_table.sql": {"my_schema", "my_table", "", "", "sql"}, + "/test/123/my_schema.my_table.sql.gz": {"my_schema", "my_table", "", "gz", "sql"}, + "my_dir/my_schema.my_table.csv.lzo": {"my_schema", "my_table", "", "lzo", "csv"}, + "my_schema.my_table.0001.sql.snappy": {"my_schema", "my_table", "0001", "snappy", "sql"}, + } + for path, fields := range inputOutputMap { + res, err := r.Route(path) + assert.NoError(t, err) + compress, e := parseCompressionType(fields[3]) + assert.NoError(t, e) + ty, e := parseSourceType(fields[4]) + assert.NoError(t, e) + exp := &RouteResult{filter.Table{Schema: fields[0], Name: fields[1]}, fields[2], compress, ty} + assert.Equal(t, exp, res) + } +} + func TestInvalidRouteRule(t *testing.T) { rule := &config.FileRouteRule{} rules := []*config.FileRouteRule{rule} @@ -112,7 +140,6 @@ func TestSingleRouteRule(t *testing.T) { require.NoError(t, err) require.NotNil(t, r) invalidMatchPaths := []string{ - "my_schema.my_table.sql.gz", "my_schema.my_table.sql.rar", "my_schema.my_table.txt", } @@ -121,6 +148,11 @@ func TestSingleRouteRule(t *testing.T) { assert.Nil(t, res) assert.Error(t, err) } + + res, err := r.Route("my_schema.my_table.sql.gz") + assert.NoError(t, err) + exp := &RouteResult{filter.Table{Schema: "my_schema", Name: "my_table"}, "", CompressionGZ, SourceTypeSQL} + assert.Equal(t, exp, res) } func TestMultiRouteRule(t *testing.T) {