Skip to content

Commit

Permalink
Merge branch 'master' into s13-add-warning-to-slow-query-part1
Browse files Browse the repository at this point in the history
  • Loading branch information
time-and-fate authored Dec 19, 2022
2 parents 5ffab0b + 0c18082 commit 81701ab
Show file tree
Hide file tree
Showing 134 changed files with 6,235 additions and 4,388 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
startup --host_jvm_args=-Xmx5g
startup --host_jvm_args=-Xmx8g
startup --unlimit_coredumps

run:ci --color=yes
Expand Down
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4425,8 +4425,8 @@ def go_deps():
name = "org_golang_x_net",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/net",
sum = "h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU=",
version = "v0.2.0",
sum = "h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=",
version = "v0.4.0",
)
go_repository(
name = "org_golang_x_oauth2",
Expand All @@ -4453,15 +4453,15 @@ def go_deps():
name = "org_golang_x_term",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/term",
sum = "h1:z85xZCsEl7bi/KwbNADeBYoOP0++7W1ipu+aGnpwzRM=",
version = "v0.2.0",
sum = "h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_text",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/text",
sum = "h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=",
version = "v0.4.0",
sum = "h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=",
version = "v0.5.0",
)
go_repository(
name = "org_golang_x_time",
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/aws/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,9 @@ go_test(
name = "aws_test",
srcs = ["ebs_test.go"],
embed = [":aws"],
deps = ["@com_github_stretchr_testify//require"],
deps = [
"@com_github_aws_aws_sdk_go//aws",
"@com_github_aws_aws_sdk_go//service/ec2",
"@com_github_stretchr_testify//require",
],
)
31 changes: 20 additions & 11 deletions br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,9 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress
return 0, errors.Trace(err)
}

var unfinishedVolumes []*string
for _, volume := range resp.Volumes {
if *volume.State == ec2.VolumeStateAvailable {
log.Info("volume is available", zap.String("id", *volume.SnapshotId))
totalVolumeSize += *volume.Size
progress.Inc()
} else {
log.Debug("volume creating...", zap.Stringer("volume", volume))
unfinishedVolumes = append(unfinishedVolumes, volume.SnapshotId)
}
}
createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(resp)
progress.IncBy(int64(len(pendingVolumes) - len(unfinishedVolumes)))
totalVolumeSize += createdVolumeSize
pendingVolumes = unfinishedVolumes
}
log.Info("all pending volume are created.")
Expand Down Expand Up @@ -357,3 +349,20 @@ func (e *EC2Session) DeleteVolumes(volumeIDMap map[string]string) {
func ec2Tag(key, val string) *ec2.Tag {
return &ec2.Tag{Key: &key, Value: &val}
}

func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput) (int64, []*string) {
totalVolumeSize := int64(0)

var unfinishedVolumes []*string
for _, volume := range resp.Volumes {
if *volume.State == ec2.VolumeStateAvailable {
log.Info("volume is available", zap.String("id", *volume.VolumeId))
totalVolumeSize += *volume.Size
} else {
log.Debug("volume creating...", zap.Stringer("volume", volume))
unfinishedVolumes = append(unfinishedVolumes, volume.VolumeId)
}
}

return totalVolumeSize, unfinishedVolumes
}
53 changes: 45 additions & 8 deletions br/pkg/aws/ebs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,63 @@ package aws
import (
"testing"

awsapi "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/stretchr/testify/require"
)

func TestEC2SessionExtractSnapProgress(t *testing.T) {
strPtr := func(s string) *string {
return &s
}
tests := []struct {
str *string
want int64
}{
{nil, 0},
{strPtr("12.12%"), 12},
{strPtr("44.99%"), 44},
{strPtr(" 89.89% "), 89},
{strPtr("100%"), 100},
{strPtr("111111%"), 100},
{awsapi.String("12.12%"), 12},
{awsapi.String("44.99%"), 44},
{awsapi.String(" 89.89% "), 89},
{awsapi.String("100%"), 100},
{awsapi.String("111111%"), 100},
}
e := &EC2Session{}
for _, tt := range tests {
require.Equal(t, tt.want, e.extractSnapProgress(tt.str))
}
}

func createVolume(snapshotId string, volumeId string, state string) *ec2.Volume {
return &ec2.Volume{
Attachments: nil,
AvailabilityZone: awsapi.String("us-west-2"),
CreateTime: nil,
Encrypted: awsapi.Bool(true),
FastRestored: awsapi.Bool(true),
Iops: awsapi.Int64(3000),
KmsKeyId: nil,
MultiAttachEnabled: awsapi.Bool(true),
OutpostArn: awsapi.String("arn:12342"),
Size: awsapi.Int64(1),
SnapshotId: awsapi.String(snapshotId),
State: awsapi.String(state),
Tags: nil,
Throughput: nil,
VolumeId: awsapi.String(volumeId),
VolumeType: awsapi.String("gp3"),
}
}
func TestHandleDescribeVolumesResponse(t *testing.T) {
curentVolumesStates := &ec2.DescribeVolumesOutput{
NextToken: awsapi.String("fake token"),
Volumes: []*ec2.Volume{
createVolume("snap-0873674883", "vol-98768979", "available"),
createVolume("snap-0873674883", "vol-98768979", "creating"),
createVolume("snap-0873674883", "vol-98768979", "available"),
createVolume("snap-0873674883", "vol-98768979", "available"),
createVolume("snap-0873674883", "vol-98768979", "available"),
},
}

e := &EC2Session{}
createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(curentVolumesStates)
require.Equal(t, int64(4), createdVolumeSize)
require.Equal(t, 1, len(unfinishedVolumes))
}
Binary file not shown.
99 changes: 96 additions & 3 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mydump

import (
"context"
"io"
"path/filepath"
"sort"
"strings"
Expand All @@ -30,6 +31,9 @@ import (
"go.uber.org/zap"
)

// sampleCompressedFileSize represents how many bytes need to be sampled for compressed files
const sampleCompressedFileSize = 4 * 1024

// MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader.
type MDDatabaseMeta struct {
Name string
Expand Down Expand Up @@ -82,7 +86,9 @@ type SourceFileMeta struct {
Compression Compression
SortKey string
FileSize int64
ExtendData ExtendColumnData
// WARNING: variables below are not persistent
ExtendData ExtendColumnData
RealSize int64
}

// NewMDTableMeta creates an Mydumper table meta with specified character set.
Expand Down Expand Up @@ -386,7 +392,7 @@ func (s *mdLoaderSetup) setup(ctx context.Context) error {
// set a dummy `FileInfo` here without file meta because we needn't restore the table schema
tableMeta, _, _ := s.insertTable(FileInfo{TableName: fileInfo.TableName})
tableMeta.DataFiles = append(tableMeta.DataFiles, fileInfo)
tableMeta.TotalSize += fileInfo.FileMeta.FileSize
tableMeta.TotalSize += fileInfo.FileMeta.RealSize
}

for _, dbMeta := range s.loader.dbs {
Expand Down Expand Up @@ -453,7 +459,7 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size

info := FileInfo{
TableName: filter.Table{Schema: res.Schema, Name: res.Name},
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size},
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size, RealSize: size},
}

if s.loader.shouldSkip(&info.TableName) {
Expand All @@ -470,6 +476,15 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size
case SourceTypeViewSchema:
s.viewSchemas = append(s.viewSchemas, info)
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet:
if info.FileMeta.Compression != CompressionNone {
compressRatio, err2 := SampleFileCompressRatio(ctx, info.FileMeta, s.loader.GetStore())
if err2 != nil {
logger.Error("[loader] fail to calculate data file compress ratio",
zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type))
} else {
info.FileMeta.RealSize = int64(compressRatio * float64(info.FileMeta.FileSize))
}
}
s.tableDatas = append(s.tableDatas, info)
}

Expand Down Expand Up @@ -648,3 +663,81 @@ func (l *MDLoader) GetDatabases() []*MDDatabaseMeta {
func (l *MDLoader) GetStore() storage.ExternalStorage {
return l.store
}

func calculateFileBytes(ctx context.Context,
dataFile string,
compressType storage.CompressType,
store storage.ExternalStorage,
offset int64) (tot int, pos int64, err error) {
bytes := make([]byte, sampleCompressedFileSize)
reader, err := store.Open(ctx, dataFile)
if err != nil {
return 0, 0, errors.Trace(err)
}
defer reader.Close()

compressReader, err := storage.NewLimitedInterceptReader(reader, compressType, offset)
if err != nil {
return 0, 0, errors.Trace(err)
}

readBytes := func() error {
n, err2 := compressReader.Read(bytes)
if err2 != nil && errors.Cause(err2) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF {
return err2
}
tot += n
return err2
}

if offset == 0 {
err = readBytes()
if err != nil && errors.Cause(err) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF {
return 0, 0, err
}
pos, err = compressReader.Seek(0, io.SeekCurrent)
if err != nil {
return 0, 0, errors.Trace(err)
}
return tot, pos, nil
}

for {
err = readBytes()
if err != nil {
break
}
}
if err != nil && errors.Cause(err) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF {
return 0, 0, errors.Trace(err)
}
return tot, offset, nil
}

// SampleFileCompressRatio samples the compress ratio of the compressed file.
func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) {
if fileMeta.Compression == CompressionNone {
return 1, nil
}
compressType, err := ToStorageCompressType(fileMeta.Compression)
if err != nil {
return 0, err
}
// We use the following method to sample the compress ratio of the first few bytes of the file.
// 1. read first time aiming to find a valid compressed file offset. If we continue read now, the compress reader will
// request more data from file reader buffer them in its memory. We can't compute an accurate compress ratio.
// 2. we use a second reading and limit the file reader only read n bytes(n is the valid position we find in the first reading).
// Then we read all the data out from the compress reader. The data length m we read out is the uncompressed data length.
// Use m/n to compute the compress ratio.
// read first time, aims to find a valid end pos in compressed file
_, pos, err := calculateFileBytes(ctx, fileMeta.Path, compressType, store, 0)
if err != nil {
return 0, err
}
// read second time, original reader ends at first time's valid pos, compute sample data compress ratio
tot, pos, err := calculateFileBytes(ctx, fileMeta.Path, compressType, store, pos)
if err != nil {
return 0, err
}
return float64(tot) / float64(pos), nil
}
33 changes: 33 additions & 0 deletions br/pkg/lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package mydump_test

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"os"
Expand Down Expand Up @@ -1053,3 +1055,34 @@ func TestExternalDataRoutes(t *testing.T) {
require.Equal(t, expectedExtendVals[i], fileInfo.FileMeta.ExtendData.Values)
}
}

func TestSampleFileCompressRatio(t *testing.T) {
s := newTestMydumpLoaderSuite(t)
store, err := storage.NewLocalStorage(s.sourceDir)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

byteArray := make([]byte, 0, 4096)
bf := bytes.NewBuffer(byteArray)
compressWriter := gzip.NewWriter(bf)
csvData := []byte("aaaa\n")
for i := 0; i < 1000; i++ {
_, err = compressWriter.Write(csvData)
require.NoError(t, err)
}
err = compressWriter.Flush()
require.NoError(t, err)

fileName := "test_1.t1.csv.gz"
err = store.WriteFile(ctx, fileName, bf.Bytes())
require.NoError(t, err)

ratio, err := md.SampleFileCompressRatio(ctx, md.SourceFileMeta{
Path: fileName,
Compression: md.CompressionGZ,
}, store)
require.NoError(t, err)
require.InDelta(t, ratio, 5000.0/float64(bf.Len()), 1e-5)
}
Loading

0 comments on commit 81701ab

Please sign in to comment.