Skip to content

Commit

Permalink
Merge branch 'master' into fix-backup-sysTable
Browse files Browse the repository at this point in the history
  • Loading branch information
joccau authored Nov 15, 2021
2 parents 5fd59cb + 64681dd commit 3ca590a
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 260 deletions.
323 changes: 112 additions & 211 deletions executor/aggfuncs/aggfunc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,109 +466,6 @@ func testMergePartialResult(t *testing.T, p aggTest) {
require.Equalf(t, 0, result, "%v != %v", dt.String(), p.results[2])
}

// Deprecated: migrating to testMergePartialResult(t *testing.T, p aggTest)
func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
srcChk := p.genSrcChk()
iter := chunk.NewIterator4Chunk(srcChk)

args := []expression.Expression{&expression.Column{RetType: p.dataType, Index: 0}}
if p.funcName == ast.AggFuncGroupConcat {
args = append(args, &expression.Constant{Value: types.NewStringDatum(separator), RetType: types.NewFieldType(mysql.TypeString)})
}
desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
partialDesc, finalDesc := desc.Split([]int{0, 1})

// build partial func for partial phase.
partialFunc := aggfuncs.Build(s.ctx, partialDesc, 0)
partialResult, _ := partialFunc.AllocPartialResult()

// build final func for final phase.
finalFunc := aggfuncs.Build(s.ctx, finalDesc, 0)
finalPr, _ := finalFunc.AllocPartialResult()
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{p.dataType}, 1)
if p.funcName == ast.AggFuncApproxCountDistinct {
resultChk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeString)}, 1)
}
if p.funcName == ast.AggFuncJsonArrayagg {
resultChk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeJSON)}, 1)
}

// update partial result.
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
_, err = partialFunc.UpdatePartialResult(s.ctx, []chunk.Row{row}, partialResult)
c.Assert(err, IsNil)
}
p.messUpChunk(srcChk)
err = partialFunc.AppendFinalResult2Chunk(s.ctx, partialResult, resultChk)
c.Assert(err, IsNil)
dt := resultChk.GetRow(0).GetDatum(0, p.dataType)
if p.funcName == ast.AggFuncApproxCountDistinct {
dt = resultChk.GetRow(0).GetDatum(0, types.NewFieldType(mysql.TypeString))
}
if p.funcName == ast.AggFuncJsonArrayagg {
dt = resultChk.GetRow(0).GetDatum(0, types.NewFieldType(mysql.TypeJSON))
}
result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))

_, err = finalFunc.MergePartialResult(s.ctx, partialResult, finalPr)
c.Assert(err, IsNil)
partialFunc.ResetPartialResult(partialResult)

srcChk = p.genSrcChk()
iter = chunk.NewIterator4Chunk(srcChk)
iter.Begin()
iter.Next()
for row := iter.Next(); row != iter.End(); row = iter.Next() {
_, err = partialFunc.UpdatePartialResult(s.ctx, []chunk.Row{row}, partialResult)
c.Assert(err, IsNil)
}
p.messUpChunk(srcChk)
resultChk.Reset()
err = partialFunc.AppendFinalResult2Chunk(s.ctx, partialResult, resultChk)
c.Assert(err, IsNil)
dt = resultChk.GetRow(0).GetDatum(0, p.dataType)
if p.funcName == ast.AggFuncApproxCountDistinct {
dt = resultChk.GetRow(0).GetDatum(0, types.NewFieldType(mysql.TypeString))
}
if p.funcName == ast.AggFuncJsonArrayagg {
dt = resultChk.GetRow(0).GetDatum(0, types.NewFieldType(mysql.TypeJSON))
}
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))
_, err = finalFunc.MergePartialResult(s.ctx, partialResult, finalPr)
c.Assert(err, IsNil)

if p.funcName == ast.AggFuncApproxCountDistinct {
resultChk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, 1)
}
if p.funcName == ast.AggFuncJsonArrayagg {
resultChk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeJSON)}, 1)
}
resultChk.Reset()
err = finalFunc.AppendFinalResult2Chunk(s.ctx, finalPr, resultChk)
c.Assert(err, IsNil)

dt = resultChk.GetRow(0).GetDatum(0, p.dataType)
if p.funcName == ast.AggFuncApproxCountDistinct {
dt = resultChk.GetRow(0).GetDatum(0, types.NewFieldType(mysql.TypeLonglong))
}
if p.funcName == ast.AggFuncJsonArrayagg {
dt = resultChk.GetRow(0).GetDatum(0, types.NewFieldType(mysql.TypeJSON))
}
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[2])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[2]))
}

func buildAggTester(funcName string, tp byte, numRows int, results ...interface{}) aggTest {
return buildAggTesterWithFieldType(funcName, types.NewFieldType(tp), numRows, results...)
}
Expand Down Expand Up @@ -809,95 +706,6 @@ func testAggFunc(t *testing.T, p aggTest) {
require.Equalf(t, 0, result, "%v != %v", dt.String(), p.results[0])
}

// Deprecated: migrating to func testAggFunc(t *testing.T, p aggTest)
func (s *testSuite) testAggFunc(c *C, p aggTest) {
srcChk := p.genSrcChk()

args := []expression.Expression{&expression.Column{RetType: p.dataType, Index: 0}}
if p.funcName == ast.AggFuncGroupConcat {
args = append(args, &expression.Constant{Value: types.NewStringDatum(separator), RetType: types.NewFieldType(mysql.TypeString)})
}
if p.funcName == ast.AggFuncApproxPercentile {
args = append(args, &expression.Constant{Value: types.NewIntDatum(50), RetType: types.NewFieldType(mysql.TypeLong)})
}
desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc := aggfuncs.Build(s.ctx, desc, 0)
finalPr, _ := finalFunc.AllocPartialResult()
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1)

iter := chunk.NewIterator4Chunk(srcChk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
_, err = finalFunc.UpdatePartialResult(s.ctx, []chunk.Row{row}, finalPr)
c.Assert(err, IsNil)
}
p.messUpChunk(srcChk)
err = finalFunc.AppendFinalResult2Chunk(s.ctx, finalPr, resultChk)
c.Assert(err, IsNil)
dt := resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))

// test the empty input
resultChk.Reset()
finalFunc.ResetPartialResult(finalPr)
err = finalFunc.AppendFinalResult2Chunk(s.ctx, finalPr, resultChk)
c.Assert(err, IsNil)
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))

// test the agg func with distinct
desc, err = aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, true)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc = aggfuncs.Build(s.ctx, desc, 0)
finalPr, _ = finalFunc.AllocPartialResult()

resultChk.Reset()
srcChk = p.genSrcChk()
iter = chunk.NewIterator4Chunk(srcChk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
_, err = finalFunc.UpdatePartialResult(s.ctx, []chunk.Row{row}, finalPr)
c.Assert(err, IsNil)
}
p.messUpChunk(srcChk)
srcChk = p.genSrcChk()
iter = chunk.NewIterator4Chunk(srcChk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
_, err = finalFunc.UpdatePartialResult(s.ctx, []chunk.Row{row}, finalPr)
c.Assert(err, IsNil)
}
p.messUpChunk(srcChk)
err = finalFunc.AppendFinalResult2Chunk(s.ctx, finalPr, resultChk)
c.Assert(err, IsNil)
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))

// test the empty input
resultChk.Reset()
finalFunc.ResetPartialResult(finalPr)
err = finalFunc.AppendFinalResult2Chunk(s.ctx, finalPr, resultChk)
c.Assert(err, IsNil)
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))
}

func testAggFuncWithoutDistinct(t *testing.T, p aggTest) {
srcChk := p.genSrcChk()

Expand Down Expand Up @@ -975,37 +783,96 @@ func testAggMemFunc(t *testing.T, p aggMemTest) {
}
}

// Deprecated: migrating to testAggMemFunc(t *testing.T, p aggMemTest)
func (s *testSuite) testAggMemFunc(c *C, p aggMemTest) {
srcChk := p.aggTest.genSrcChk()
func testMultiArgsAggFunc(t *testing.T, ctx sessionctx.Context, p multiArgsAggTest) {
srcChk := p.genSrcChk()

args := []expression.Expression{&expression.Column{RetType: p.aggTest.dataType, Index: 0}}
if p.aggTest.funcName == ast.AggFuncGroupConcat {
args := make([]expression.Expression, len(p.dataTypes))
for k := 0; k < len(p.dataTypes); k++ {
args[k] = &expression.Column{RetType: p.dataTypes[k], Index: k}
}
if p.funcName == ast.AggFuncGroupConcat {
args = append(args, &expression.Constant{Value: types.NewStringDatum(separator), RetType: types.NewFieldType(mysql.TypeString)})
}
desc, err := aggregation.NewAggFuncDesc(s.ctx, p.aggTest.funcName, args, p.isDistinct)
c.Assert(err, IsNil)
if p.aggTest.orderBy {

desc, err := aggregation.NewAggFuncDesc(ctx, p.funcName, args, false)
require.NoError(t, err)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc := aggfuncs.Build(s.ctx, desc, 0)
finalPr, memDelta := finalFunc.AllocPartialResult()
c.Assert(memDelta, Equals, p.allocMemDelta)
finalFunc := aggfuncs.Build(ctx, desc, 0)
finalPr, _ := finalFunc.AllocPartialResult()
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1)

updateMemDeltas, err := p.updateMemDeltaGens(srcChk, p.aggTest.dataType)
c.Assert(err, IsNil)
iter := chunk.NewIterator4Chunk(srcChk)
i := 0
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
memDelta, err := finalFunc.UpdatePartialResult(s.ctx, []chunk.Row{row}, finalPr)
c.Assert(err, IsNil)
c.Assert(memDelta, Equals, updateMemDeltas[i])
i++
// FIXME: cannot assert error since there are cases of error, e.g. rows were cut by GROUPCONCAT
_, _ = finalFunc.UpdatePartialResult(ctx, []chunk.Row{row}, finalPr)
}
p.messUpChunk(srcChk)
err = finalFunc.AppendFinalResult2Chunk(ctx, finalPr, resultChk)
require.NoError(t, err)
dt := resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err := dt.CompareDatum(ctx.GetSessionVars().StmtCtx, &p.results[1])
require.NoError(t, err)
require.Zerof(t, result, "%v != %v", dt.String(), p.results[1])

// test the empty input
resultChk.Reset()
finalFunc.ResetPartialResult(finalPr)
err = finalFunc.AppendFinalResult2Chunk(ctx, finalPr, resultChk)
require.NoError(t, err)
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(ctx.GetSessionVars().StmtCtx, &p.results[0])
require.NoError(t, err)
require.Zerof(t, result, "%v != %v", dt.String(), p.results[0])

// test the agg func with distinct
desc, err = aggregation.NewAggFuncDesc(ctx, p.funcName, args, true)
require.NoError(t, err)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc = aggfuncs.Build(ctx, desc, 0)
finalPr, _ = finalFunc.AllocPartialResult()

resultChk.Reset()
srcChk = p.genSrcChk()
iter = chunk.NewIterator4Chunk(srcChk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
// FIXME: cannot check error
_, _ = finalFunc.UpdatePartialResult(ctx, []chunk.Row{row}, finalPr)
}
p.messUpChunk(srcChk)
srcChk = p.genSrcChk()
iter = chunk.NewIterator4Chunk(srcChk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
// FIXME: cannot check error
_, _ = finalFunc.UpdatePartialResult(ctx, []chunk.Row{row}, finalPr)
}
p.messUpChunk(srcChk)
err = finalFunc.AppendFinalResult2Chunk(ctx, finalPr, resultChk)
require.NoError(t, err)
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(ctx.GetSessionVars().StmtCtx, &p.results[1])
require.NoError(t, err)
require.Zerof(t, result, "%v != %v", dt.String(), p.results[1])

// test the empty input
resultChk.Reset()
finalFunc.ResetPartialResult(finalPr)
err = finalFunc.AppendFinalResult2Chunk(ctx, finalPr, resultChk)
require.NoError(t, err)
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(ctx.GetSessionVars().StmtCtx, &p.results[0])
require.NoError(t, err)
require.Zero(t, result)
}

// Deprecated: migrating to testMultiArgsAggFunc(t *testing.T, ctx sessionctx.Context, p multiArgsAggTest)
func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
srcChk := p.genSrcChk()

Expand Down Expand Up @@ -1095,6 +962,41 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
c.Assert(result, Equals, 0)
}

func testMultiArgsAggMemFunc(t *testing.T, p multiArgsAggMemTest) {
srcChk := p.multiArgsAggTest.genSrcChk()
ctx := mock.NewContext()

args := make([]expression.Expression, len(p.multiArgsAggTest.dataTypes))
for k := 0; k < len(p.multiArgsAggTest.dataTypes); k++ {
args[k] = &expression.Column{RetType: p.multiArgsAggTest.dataTypes[k], Index: k}
}
if p.multiArgsAggTest.funcName == ast.AggFuncGroupConcat {
args = append(args, &expression.Constant{Value: types.NewStringDatum(separator), RetType: types.NewFieldType(mysql.TypeString)})
}

desc, err := aggregation.NewAggFuncDesc(ctx, p.multiArgsAggTest.funcName, args, p.isDistinct)
require.NoError(t, err)
if p.multiArgsAggTest.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc := aggfuncs.Build(ctx, desc, 0)
finalPr, memDelta := finalFunc.AllocPartialResult()
require.Equal(t, p.allocMemDelta, memDelta)

updateMemDeltas, err := p.multiArgsUpdateMemDeltaGens(srcChk, p.multiArgsAggTest.dataTypes, desc.OrderByItems)
require.NoError(t, err)
iter := chunk.NewIterator4Chunk(srcChk)
i := 0
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
memDelta, _ := finalFunc.UpdatePartialResult(ctx, []chunk.Row{row}, finalPr)
require.Equal(t, updateMemDeltas[i], memDelta)
i++
}
}

// Deprecated: migrating to testMultiArgsAggMemFunc(t *testing.T, p multiArgsAggMemTest)
func (s *testSuite) testMultiArgsAggMemFunc(c *C, p multiArgsAggMemTest) {
srcChk := p.multiArgsAggTest.genSrcChk()

Expand Down Expand Up @@ -1127,7 +1029,6 @@ func (s *testSuite) testMultiArgsAggMemFunc(c *C, p multiArgsAggMemTest) {
i++
}
}

func (s *testSuite) benchmarkAggFunc(b *testing.B, p aggTest) {
srcChk := chunk.NewChunkWithCapacity([]*types.FieldType{p.dataType}, p.numRows)
for i := 0; i < p.numRows; i++ {
Expand Down
Loading

0 comments on commit 3ca590a

Please sign in to comment.