Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dumpling: fix dumpling ignore file writer close error #45374

Merged
merged 3 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dumpling/export/ir_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func newMultiQueriesChunk(queries []string, colLength int) *multiQueriesChunk {
func (td *multiQueriesChunk) Start(tctx *tcontext.Context, conn *sql.Conn) error {
td.tctx = tctx
td.conn = conn
td.SQLRowIter = nil
return nil
}

Expand Down
9 changes: 6 additions & 3 deletions dumpling/export/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,12 @@ func (m *globalMetadata) writeGlobalMetaData() error {
if err != nil {
return err
}
defer tearDown(m.tctx)

return write(m.tctx, fileWriter, m.String())
err = write(m.tctx, fileWriter, m.String())
tearDownErr := tearDown(m.tctx)
if err == nil {
return tearDownErr
}
return err
}

func getValidStr(str []string, idx int) string {
Expand Down
14 changes: 10 additions & 4 deletions dumpling/export/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,13 @@ func (w *Writer) tryToWriteTableData(tctx *tcontext.Context, meta TableMeta, ir
for {
fileWriter, tearDown := buildInterceptFileWriter(tctx, w.extStorage, fileName, conf.CompressType)
n, err := format.WriteInsert(tctx, conf, meta, ir, fileWriter, w.metrics)
tearDown(tctx)
tearDownErr := tearDown(tctx)
if err != nil {
return err
}
if tearDownErr != nil {
return tearDownErr
}

if w, ok := fileWriter.(*InterceptFileWriter); ok && !w.SomethingIsWritten {
break
Expand Down Expand Up @@ -279,13 +282,16 @@ func (w *Writer) writeMetaToFile(tctx *tcontext.Context, target, metaSQL string,
if err != nil {
return errors.Trace(err)
}
defer tearDown(tctx)

return WriteMeta(tctx, &metaData{
err = WriteMeta(tctx, &metaData{
target: target,
metaSQL: metaSQL,
specCmts: getSpecialComments(w.conf.ServerInfo.ServerType),
}, fileWriter)
tearDownErr := tearDown(tctx)
if err == nil {
return tearDownErr
}
return err
}

type outputFileNamer struct {
Expand Down
14 changes: 14 additions & 0 deletions dumpling/export/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/version"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/util/promutil"
Expand Down Expand Up @@ -71,6 +72,12 @@ func TestWriteTableMeta(t *testing.T) {
bytes, err := os.ReadFile(p)
require.NoError(t, err)
require.Equal(t, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;\n/*!40101 SET NAMES binary*/;\nCREATE TABLE t (a INT);\n", string(bytes))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/dumpling/export/FailToCloseMetaFile", "return(true)"))
defer failpoint.Disable("github.com/pingcap/tidb/dumpling/export/FailToCloseMetaFile=return(true)")

err = writer.WriteTableMeta("test", "t", "CREATE TABLE t (a INT)")
require.ErrorContains(t, err, "injected error: fail to close meta file")
}

func TestWriteViewMeta(t *testing.T) {
Expand Down Expand Up @@ -137,6 +144,13 @@ func TestWriteTableData(t *testing.T) {
"(3,'male','john@mail.com','020-1256','healthy'),\n" +
"(4,'female','sarah@mail.com','020-1235','healthy');\n"
require.Equal(t, expected, string(bytes))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/dumpling/export/FailToCloseDataFile", "return(true)"))
defer failpoint.Disable("github.com/pingcap/tidb/dumpling/export/FailToCloseDataFile=return(true)")

tableIR = newMockTableIR("test", "employee", data, specCmts, colTypes)
err = writer.WriteTableData(tableIR, tableIR, 0)
require.ErrorContains(t, err, "injected error: fail to close data file")
}

func TestWriteTableDataWithFileSize(t *testing.T) {
Expand Down
20 changes: 14 additions & 6 deletions dumpling/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b
return errors.Trace(err)
}

func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context), error) {
func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) {
fileName += compressFileSuffix(compressType)
fullPath := s.URI() + "/" + fileName
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName)
Expand All @@ -462,20 +462,24 @@ func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName
return nil, nil, errors.Trace(err)
}
tctx.L().Debug("opened file", zap.String("path", fullPath))
tearDownRoutine := func(ctx context.Context) {
tearDownRoutine := func(ctx context.Context) error {
err := writer.Close(ctx)
failpoint.Inject("FailToCloseMetaFile", func(_ failpoint.Value) {
err = errors.New("injected error: fail to close meta file")
})
if err == nil {
return
return nil
}
err = errors.Trace(err)
tctx.L().Warn("fail to close file",
zap.String("path", fullPath),
zap.Error(err))
return err
}
return writer, tearDownRoutine, nil
}

func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(context.Context)) {
func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(context.Context) error) {
fileName += compressFileSuffix(compressType)
var writer storage.ExternalFileWriter
fullPath := s.URI() + "/" + fileName
Expand All @@ -497,17 +501,21 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage,
}
fileWriter.initRoutine = initRoutine

tearDownRoutine := func(ctx context.Context) {
tearDownRoutine := func(ctx context.Context) error {
if writer == nil {
return
return nil
}
pCtx.L().Debug("tear down lazy file writer...", zap.String("path", fullPath))
err := writer.Close(ctx)
failpoint.Inject("FailToCloseDataFile", func(_ failpoint.Value) {
err = errors.New("injected error: fail to close data file")
})
if err != nil {
pCtx.L().Warn("fail to close file",
zap.String("path", fullPath),
zap.Error(err))
}
return err
}
return fileWriter, tearDownRoutine
}
Expand Down
35 changes: 34 additions & 1 deletion dumpling/tests/basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,43 @@ run_dumpling -B "test_db" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
cnt=$(grep "IOTotalBytes=" ${DUMPLING_OUTPUT_DIR}/dumpling.log | grep -v "IOTotalBytes=0" | wc -l)
[ "$cnt" -ge 1 ]

export GO_FAILPOINTS=""
echo "Test for failing to close meta/data file"
export GO_FAILPOINTS="github.com/pingcap/tidb/dumpling/export/FailToCloseMetaFile=1*return"
rm ${DUMPLING_OUTPUT_DIR}/dumpling.log
set +e
run_dumpling -B "test_db" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
set -e
cnt=$(grep -w "dump failed error stack info" ${DUMPLING_OUTPUT_DIR}/dumpling.log|wc -l)
[ "$cnt" -ge 1 ]

# dumpling retry will make it succeed
export GO_FAILPOINTS="github.com/pingcap/tidb/dumpling/export/FailToCloseDataFile=1*return"
export DUMPLING_TEST_PORT=4000
run_sql "drop database if exists test_db;"
run_sql "create database test_db;"
run_sql "create table test_db.test_table (a int primary key);"
run_sql "insert into test_db.test_table values (1),(2),(3),(4),(5),(6),(7),(8);"
rm ${DUMPLING_OUTPUT_DIR}/dumpling.log
set +e
run_dumpling -B "test_db" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
set -e
cnt=$(grep -w "dump data successfully" ${DUMPLING_OUTPUT_DIR}/dumpling.log|wc -l)
[ "$cnt" -ge 1 ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe check the output file has exactly 8 rows

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 2191553

cnt=$(grep -w "(.*)" ${DUMPLING_OUTPUT_DIR}/test_db.test_table.000000000.sql|wc -l)
echo "records count is ${cnt}"
[ "$cnt" -eq 8 ]

export GO_FAILPOINTS="github.com/pingcap/tidb/dumpling/export/FailToCloseDataFile=5*return"
rm ${DUMPLING_OUTPUT_DIR}/dumpling.log
set +e
run_dumpling -B "test_db" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
set -e
cnt=$(grep -w "dump failed error stack info" ${DUMPLING_OUTPUT_DIR}/dumpling.log|wc -l)
[ "$cnt" -ge 1 ]

echo "Test for empty query result, should success."
run_sql "drop database if exists test_db;"
run_sql "create database test_db;"
run_sql "create table test_db.test_table (a int primary key);"
export GO_FAILPOINTS=""
run_dumpling --sql "select * from test_db.test_table" --filetype csv > ${DUMPLING_OUTPUT_DIR}/dumpling.log