Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support checkpoint read for compress files #38946

Merged
merged 9 commits into from
Nov 11, 2022
6 changes: 6 additions & 0 deletions br/pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,12 @@ func (pp *ParquetParser) SetLogger(l log.Logger) {
pp.logger = l
}

// SetRowID sets the rowID in a parquet file when we start a compressed file.
// It implements the Parser interface.
func (pp *ParquetParser) SetRowID(rowID int64) {
pp.lastRow.RowID = rowID
}

func jdToTime(jd int32, nsec int64) time.Time {
sec := int64(jd-jan011970) * secPerDay
// it's fine not to check the value of nsec
Expand Down
27 changes: 27 additions & 0 deletions br/pkg/lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ type Parser interface {
SetColumns([]string)

SetLogger(log.Logger)

SetRowID(rowID int64)
}

// NewChunkParser creates a new parser which can read chunks out of a file.
Expand Down Expand Up @@ -174,6 +176,7 @@ func (parser *blockParser) SetPos(pos int64, rowID int64) error {
}

// Pos returns the current file offset.
// Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (parser *blockParser) Pos() (pos int64, lastRowID int64) {
return parser.pos, parser.lastRow.RowID
}
Expand Down Expand Up @@ -205,6 +208,11 @@ func (parser *blockParser) SetLogger(logger log.Logger) {
parser.Logger = logger
}

// SetRowID changes the reported row ID when we firstly read compressed files.
func (parser *blockParser) SetRowID(rowID int64) {
parser.lastRow.RowID = rowID
}

type token byte

const (
Expand Down Expand Up @@ -592,3 +600,22 @@ func ReadChunks(parser Parser, minSize int64) ([]Chunk, error) {
}
}
}

// ReadUntil parses the entire file and splits it into continuous chunks of
// size >= minSize.
func ReadUntil(parser Parser, pos int64) error {
var curOffset int64
for curOffset < pos {
switch err := parser.ReadRow(); errors.Cause(err) {
case nil:
curOffset, _ = parser.Pos()

case io.EOF:
return nil

default:
return errors.Trace(err)
}
}
return nil
}
17 changes: 17 additions & 0 deletions br/pkg/lightning/mydump/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/filter"
"github.com/pingcap/tidb/util/slice"
"go.uber.org/zap"
Expand Down Expand Up @@ -71,6 +72,22 @@ const (
CompressionSnappy
)

// ToStorageCompressType converts Compression to storage.CompressType.
func ToStorageCompressType(compression Compression) (storage.CompressType, error) {
switch compression {
case CompressionGZ:
return storage.Gzip, nil
case CompressionSnappy:
return storage.Snappy, nil
case CompressionZStd:
return storage.Zstd, nil
case CompressionNone:
return storage.NoCompression, nil
default:
return storage.NoCompression, errors.Errorf("compression %d doesn't have related storage compressType", compression)
}
}

func parseSourceType(t string) (SourceType, error) {
switch strings.ToLower(strings.TrimSpace(t)) {
case SchemaSchema:
Expand Down
126 changes: 126 additions & 0 deletions br/pkg/lightning/restore/chunk_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
package restore

import (
"compress/gzip"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"sync"
"testing"

Expand All @@ -40,8 +44,10 @@ import (
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
tmock "github.com/pingcap/tidb/util/mock"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -654,3 +660,123 @@ func (s *chunkRestoreSuite) TestRestore() {
require.NoError(s.T(), err)
require.Len(s.T(), saveCpCh, 2)
}

func TestCompressChunkRestore(t *testing.T) {
// Produce a mock table info
p := parser.New()
p.SetSQLMode(mysql.ModeANSIQuotes)
node, err := p.ParseOneStmt(`
CREATE TABLE "table" (
a INT,
b INT,
c INT,
KEY (b)
)
`, "", "")
require.NoError(t, err)
core, err := ddl.BuildTableInfoFromAST(node.(*ast.CreateTableStmt))
require.NoError(t, err)
core.State = model.StatePublic

// Write some sample CSV dump
fakeDataDir := t.TempDir()
store, err := storage.NewLocalStorage(fakeDataDir)
require.NoError(t, err)

fakeDataFiles := make([]mydump.FileInfo, 0)

csvName := "db.table.1.csv.gz"
file, err := os.Create(filepath.Join(fakeDataDir, csvName))
require.NoError(t, err)
gzWriter := gzip.NewWriter(file)

var totalBytes int64
for i := 0; i < 300; i += 3 {
n, err := gzWriter.Write([]byte(fmt.Sprintf("%d,%d,%d\r\n", i, i+1, i+2)))
require.NoError(t, err)
totalBytes += int64(n)
}

err = gzWriter.Close()
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)

fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{
TableName: filter.Table{Schema: "db", Name: "table"},
FileMeta: mydump.SourceFileMeta{
Path: csvName,
Type: mydump.SourceTypeCSV,
Compression: mydump.CompressionGZ,
SortKey: "99",
FileSize: totalBytes,
},
})

chunk := checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{Path: fakeDataFiles[0].FileMeta.Path, Offset: 0},
FileMeta: fakeDataFiles[0].FileMeta,
Chunk: mydump.Chunk{
Offset: 0,
EndOffset: totalBytes,
PrevRowIDMax: 0,
RowIDMax: 100,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
w := worker.NewPool(ctx, 5, "io")
cfg := config.NewConfig()
cfg.Mydumper.BatchSize = 111
cfg.App.TableConcurrency = 2
cfg.Mydumper.CSV.Header = false

cr, err := newChunkRestore(ctx, 1, cfg, &chunk, w, store, nil)
require.NoError(t, err)
var (
id, lastID int
offset int64 = 0
rowID int64 = 0
)
for id < 100 {
offset, rowID = cr.parser.Pos()
err = cr.parser.ReadRow()
require.NoError(t, err)
rowData := cr.parser.LastRow().Row
require.Len(t, rowData, 3)
lastID = id
for i := 0; id < 100 && i < 3; i++ {
require.Equal(t, strconv.Itoa(id), rowData[i].GetString())
id++
}
}
require.Equal(t, int64(33), rowID)

// test read starting from compress files' middle
chunk = checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{Path: fakeDataFiles[0].FileMeta.Path, Offset: offset},
FileMeta: fakeDataFiles[0].FileMeta,
Chunk: mydump.Chunk{
Offset: offset,
EndOffset: totalBytes,
PrevRowIDMax: rowID,
RowIDMax: 100,
},
}
cr, err = newChunkRestore(ctx, 1, cfg, &chunk, w, store, nil)
require.NoError(t, err)
for id = lastID; id < 300; {
err = cr.parser.ReadRow()
require.NoError(t, err)
rowData := cr.parser.LastRow().Row
require.Len(t, rowData, 3)
for i := 0; id < 300 && i < 3; i++ {
require.Equal(t, strconv.Itoa(id), rowData[i].GetString())
id++
}
}
_, rowID = cr.parser.Pos()
require.Equal(t, int64(100), rowID)
err = cr.parser.ReadRow()
require.Equal(t, io.EOF, errors.Cause(err))
}
29 changes: 23 additions & 6 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2190,11 +2190,21 @@ func newChunkRestore(
) (*chunkRestore, error) {
blockBufSize := int64(cfg.Mydumper.ReadBlockSize)

var reader storage.ReadSeekCloser
var err error
if chunk.FileMeta.Type == mydump.SourceTypeParquet {
var (
reader storage.ReadSeekCloser
compressType storage.CompressType
err error
)
switch {
case chunk.FileMeta.Type == mydump.SourceTypeParquet:
reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize)
} else {
case chunk.FileMeta.Compression != mydump.CompressionNone:
compressType, err = mydump.ToStorageCompressType(chunk.FileMeta.Compression)
if err != nil {
break
}
reader, err = storage.WithCompression(store, compressType).Open(ctx, chunk.FileMeta.Path)
default:
reader, err = store.Open(ctx, chunk.FileMeta.Path)
}
if err != nil {
Expand Down Expand Up @@ -2225,8 +2235,15 @@ func newChunkRestore(
panic(fmt.Sprintf("file '%s' with unknown source type '%s'", chunk.Key.Path, chunk.FileMeta.Type.String()))
}

if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil {
return nil, errors.Trace(err)
if chunk.FileMeta.Compression == mydump.CompressionNone {
if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil {
return nil, errors.Trace(err)
}
} else {
if err = mydump.ReadUntil(parser, chunk.Chunk.Offset); err != nil {
return nil, errors.Trace(err)
}
parser.SetRowID(chunk.Chunk.PrevRowIDMax)
}
if len(chunk.ColumnPermutation) > 0 {
parser.SetColumns(getColumnNames(tableInfo.Core, chunk.ColumnPermutation))
Expand Down
20 changes: 11 additions & 9 deletions br/pkg/storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ type interceptBuffer interface {
}

func createSuffixString(compressType CompressType) string {
if compressType == Gzip {
return ".txt.gz"
}
if compressType == Snappy {
return ".txt.snappy"
}
if compressType == Zstd {
return ".txt.zst"
txtSuffix := ".txt"
switch compressType {
case Gzip:
txtSuffix += ".gz"
case Snappy:
txtSuffix += ".snappy"
case Zstd:
txtSuffix += ".zst"
default:
return ""
}
return ""
return txtSuffix
}

func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer {
Expand Down