Skip to content

Commit

Permalink
lightning: support checkpoint read for compress files (#38946)
Browse files Browse the repository at this point in the history
ref #38514
  • Loading branch information
lichunzhu committed Nov 11, 2022
1 parent f16209f commit 481f5ab
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 15 deletions.
6 changes: 6 additions & 0 deletions br/pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,12 @@ func (pp *ParquetParser) SetLogger(l log.Logger) {
pp.logger = l
}

// SetRowID sets the rowID in a parquet file when we start a compressed file.
// It implements the Parser interface.
func (pp *ParquetParser) SetRowID(rowID int64) {
pp.lastRow.RowID = rowID
}

func jdToTime(jd int32, nsec int64) time.Time {
sec := int64(jd-jan011970) * secPerDay
// it's fine not to check the value of nsec
Expand Down
27 changes: 27 additions & 0 deletions br/pkg/lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ type Parser interface {
SetColumns([]string)

SetLogger(log.Logger)

SetRowID(rowID int64)
}

// NewChunkParser creates a new parser which can read chunks out of a file.
Expand Down Expand Up @@ -174,6 +176,7 @@ func (parser *blockParser) SetPos(pos int64, rowID int64) error {
}

// Pos returns the current file offset.
// Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (parser *blockParser) Pos() (pos int64, lastRowID int64) {
return parser.pos, parser.lastRow.RowID
}
Expand Down Expand Up @@ -205,6 +208,11 @@ func (parser *blockParser) SetLogger(logger log.Logger) {
parser.Logger = logger
}

// SetRowID changes the reported row ID when we firstly read compressed files.
func (parser *blockParser) SetRowID(rowID int64) {
parser.lastRow.RowID = rowID
}

type token byte

const (
Expand Down Expand Up @@ -592,3 +600,22 @@ func ReadChunks(parser Parser, minSize int64) ([]Chunk, error) {
}
}
}

// ReadUntil parses the entire file and splits it into continuous chunks of
// size >= minSize.
func ReadUntil(parser Parser, pos int64) error {
var curOffset int64
for curOffset < pos {
switch err := parser.ReadRow(); errors.Cause(err) {
case nil:
curOffset, _ = parser.Pos()

case io.EOF:
return nil

default:
return errors.Trace(err)
}
}
return nil
}
17 changes: 17 additions & 0 deletions br/pkg/lightning/mydump/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/filter"
"github.com/pingcap/tidb/util/slice"
"go.uber.org/zap"
Expand Down Expand Up @@ -71,6 +72,22 @@ const (
CompressionSnappy
)

// ToStorageCompressType converts Compression to storage.CompressType.
func ToStorageCompressType(compression Compression) (storage.CompressType, error) {
switch compression {
case CompressionGZ:
return storage.Gzip, nil
case CompressionSnappy:
return storage.Snappy, nil
case CompressionZStd:
return storage.Zstd, nil
case CompressionNone:
return storage.NoCompression, nil
default:
return storage.NoCompression, errors.Errorf("compression %d doesn't have related storage compressType", compression)
}
}

func parseSourceType(t string) (SourceType, error) {
switch strings.ToLower(strings.TrimSpace(t)) {
case SchemaSchema:
Expand Down
126 changes: 126 additions & 0 deletions br/pkg/lightning/restore/chunk_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
package restore

import (
"compress/gzip"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"sync"
"testing"

Expand All @@ -40,8 +44,10 @@ import (
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
tmock "github.com/pingcap/tidb/util/mock"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -654,3 +660,123 @@ func (s *chunkRestoreSuite) TestRestore() {
require.NoError(s.T(), err)
require.Len(s.T(), saveCpCh, 2)
}

func TestCompressChunkRestore(t *testing.T) {
// Produce a mock table info
p := parser.New()
p.SetSQLMode(mysql.ModeANSIQuotes)
node, err := p.ParseOneStmt(`
CREATE TABLE "table" (
a INT,
b INT,
c INT,
KEY (b)
)
`, "", "")
require.NoError(t, err)
core, err := ddl.BuildTableInfoFromAST(node.(*ast.CreateTableStmt))
require.NoError(t, err)
core.State = model.StatePublic

// Write some sample CSV dump
fakeDataDir := t.TempDir()
store, err := storage.NewLocalStorage(fakeDataDir)
require.NoError(t, err)

fakeDataFiles := make([]mydump.FileInfo, 0)

csvName := "db.table.1.csv.gz"
file, err := os.Create(filepath.Join(fakeDataDir, csvName))
require.NoError(t, err)
gzWriter := gzip.NewWriter(file)

var totalBytes int64
for i := 0; i < 300; i += 3 {
n, err := gzWriter.Write([]byte(fmt.Sprintf("%d,%d,%d\r\n", i, i+1, i+2)))
require.NoError(t, err)
totalBytes += int64(n)
}

err = gzWriter.Close()
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)

fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{
TableName: filter.Table{Schema: "db", Name: "table"},
FileMeta: mydump.SourceFileMeta{
Path: csvName,
Type: mydump.SourceTypeCSV,
Compression: mydump.CompressionGZ,
SortKey: "99",
FileSize: totalBytes,
},
})

chunk := checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{Path: fakeDataFiles[0].FileMeta.Path, Offset: 0},
FileMeta: fakeDataFiles[0].FileMeta,
Chunk: mydump.Chunk{
Offset: 0,
EndOffset: totalBytes,
PrevRowIDMax: 0,
RowIDMax: 100,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
w := worker.NewPool(ctx, 5, "io")
cfg := config.NewConfig()
cfg.Mydumper.BatchSize = 111
cfg.App.TableConcurrency = 2
cfg.Mydumper.CSV.Header = false

cr, err := newChunkRestore(ctx, 1, cfg, &chunk, w, store, nil)
require.NoError(t, err)
var (
id, lastID int
offset int64 = 0
rowID int64 = 0
)
for id < 100 {
offset, rowID = cr.parser.Pos()
err = cr.parser.ReadRow()
require.NoError(t, err)
rowData := cr.parser.LastRow().Row
require.Len(t, rowData, 3)
lastID = id
for i := 0; id < 100 && i < 3; i++ {
require.Equal(t, strconv.Itoa(id), rowData[i].GetString())
id++
}
}
require.Equal(t, int64(33), rowID)

// test read starting from compress files' middle
chunk = checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{Path: fakeDataFiles[0].FileMeta.Path, Offset: offset},
FileMeta: fakeDataFiles[0].FileMeta,
Chunk: mydump.Chunk{
Offset: offset,
EndOffset: totalBytes,
PrevRowIDMax: rowID,
RowIDMax: 100,
},
}
cr, err = newChunkRestore(ctx, 1, cfg, &chunk, w, store, nil)
require.NoError(t, err)
for id = lastID; id < 300; {
err = cr.parser.ReadRow()
require.NoError(t, err)
rowData := cr.parser.LastRow().Row
require.Len(t, rowData, 3)
for i := 0; id < 300 && i < 3; i++ {
require.Equal(t, strconv.Itoa(id), rowData[i].GetString())
id++
}
}
_, rowID = cr.parser.Pos()
require.Equal(t, int64(100), rowID)
err = cr.parser.ReadRow()
require.Equal(t, io.EOF, errors.Cause(err))
}
29 changes: 23 additions & 6 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2190,11 +2190,21 @@ func newChunkRestore(
) (*chunkRestore, error) {
blockBufSize := int64(cfg.Mydumper.ReadBlockSize)

var reader storage.ReadSeekCloser
var err error
if chunk.FileMeta.Type == mydump.SourceTypeParquet {
var (
reader storage.ReadSeekCloser
compressType storage.CompressType
err error
)
switch {
case chunk.FileMeta.Type == mydump.SourceTypeParquet:
reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize)
} else {
case chunk.FileMeta.Compression != mydump.CompressionNone:
compressType, err = mydump.ToStorageCompressType(chunk.FileMeta.Compression)
if err != nil {
break
}
reader, err = storage.WithCompression(store, compressType).Open(ctx, chunk.FileMeta.Path)
default:
reader, err = store.Open(ctx, chunk.FileMeta.Path)
}
if err != nil {
Expand Down Expand Up @@ -2225,8 +2235,15 @@ func newChunkRestore(
panic(fmt.Sprintf("file '%s' with unknown source type '%s'", chunk.Key.Path, chunk.FileMeta.Type.String()))
}

if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil {
return nil, errors.Trace(err)
if chunk.FileMeta.Compression == mydump.CompressionNone {
if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil {
return nil, errors.Trace(err)
}
} else {
if err = mydump.ReadUntil(parser, chunk.Chunk.Offset); err != nil {
return nil, errors.Trace(err)
}
parser.SetRowID(chunk.Chunk.PrevRowIDMax)
}
if len(chunk.ColumnPermutation) > 0 {
parser.SetColumns(getColumnNames(tableInfo.Core, chunk.ColumnPermutation))
Expand Down
20 changes: 11 additions & 9 deletions br/pkg/storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ type interceptBuffer interface {
}

func createSuffixString(compressType CompressType) string {
if compressType == Gzip {
return ".txt.gz"
}
if compressType == Snappy {
return ".txt.snappy"
}
if compressType == Zstd {
return ".txt.zst"
txtSuffix := ".txt"
switch compressType {
case Gzip:
txtSuffix += ".gz"
case Snappy:
txtSuffix += ".snappy"
case Zstd:
txtSuffix += ".zst"
default:
return ""
}
return ""
return txtSuffix
}

func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer {
Expand Down

0 comments on commit 481f5ab

Please sign in to comment.