Skip to content

Commit

Permalink
Merge branch 'master' into misono/mock-tikv-raw-delete-range
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta committed Apr 3, 2018
2 parents c88dac2 + ccf6da1 commit 05d88e4
Show file tree
Hide file tree
Showing 97 changed files with 3,551 additions and 1,547 deletions.
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ type RecordSet interface {
// Fields gets result fields.
Fields() []*ResultField

// NextChunk reads records into chunk.
NextChunk(ctx context.Context, chk *chunk.Chunk) error
// Next reads records into chunk.
Next(ctx context.Context, chk *chunk.Chunk) error

// NewChunk creates a new chunk with initial capacity.
NewChunk() *chunk.Chunk
Expand Down
1 change: 1 addition & 0 deletions ast/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ type AlterTableSpec struct {
OldColumnName *ColumnName
Position *ColumnPosition
LockType LockType
Comment string
}

// Accept implements Node Accept interface.
Expand Down
1 change: 1 addition & 0 deletions ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ const (
AdminCancelDDLJobs
AdminCheckIndex
AdminRecoverIndex
AdminCleanupIndex
AdminCheckIndexRange
AdminShowDDLJobQueries
AdminChecksumTable
Expand Down
2 changes: 1 addition & 1 deletion cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (ut *benchDB) mustExec(sql string) {
rs := rss[0]
chk := rs.NewChunk()
for {
err := rs.NextChunk(ctx, chk)
err := rs.Next(ctx, chk)
if err != nil {
log.Fatal(err)
}
Expand Down
3 changes: 0 additions & 3 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ split-table = true
# The limit of concurrent executed sessions.
token-limit = 1000

# Enable chunk executors.
enable-chunk = true

# Only print a log when out of memory quota.
# Valid options: ["log", "cancel"]
oom-action = "log"
Expand Down
29 changes: 29 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,9 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
err = d.ShardRowID(ctx, ident, opt.UintValue)
case ast.TableOptionAutoIncrement:
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue))
case ast.TableOptionComment:
spec.Comment = opt.StrValue
err = d.AlterTableComment(ctx, ident, spec)
}
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1448,6 +1451,32 @@ func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt
return errors.Trace(err)
}

// AlterTableComment updates the table comment information.
func (d *ddl) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenByArgs(ident.Schema)
}

tb, err := is.TableByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenByArgs(ident.Schema, ident.Name))
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
Type: model.ActionModifyTableComment,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{spec.Comment},
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

// DropTable will proceed even if some table in the list does not exists.
func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
is := d.GetInformationSchema()
Expand Down
25 changes: 23 additions & 2 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ func (s *testDBSuite) TestMySQLErrorCode(c *C) {
s.testErrorCode(c, sql, tmysql.ErrDupFieldName)
sql = "alter table test_error_code_succ add column aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa int"
s.testErrorCode(c, sql, tmysql.ErrTooLongIdent)
sql = "alter table test_comment comment 'test comment'"
s.testErrorCode(c, sql, tmysql.ErrNoSuchTable)

// drop column
sql = "alter table test_error_code_succ drop c_not_exist"
s.testErrorCode(c, sql, tmysql.ErrCantDropFieldOrKey)
Expand Down Expand Up @@ -323,6 +326,7 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
}

var checkErr error
var c3IdxInfo *model.IndexInfo
hook := &ddl.TestDDLCallback{}
first := true
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
Expand All @@ -335,6 +339,16 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
// If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes.
// When the job satisfies this case of addIndexNotFirstReorg, the worker will start to backfill indexes.
if !addIndexNotFirstReorg {
// Get the index's meta.
if c3IdxInfo != nil {
return
}
t := s.testGetTable(c, "t1")
for _, index := range t.WritableIndices() {
if index.Meta().Name.L == "c3_index" {
c3IdxInfo = index.Meta()
}
}
return
}
// The job satisfies the case of addIndexNotFirst for the first time, the worker hasn't finished a batch of backfill indexes.
Expand Down Expand Up @@ -407,6 +421,10 @@ LOOP:
c.Assert(strings.EqualFold(tidx.Meta().Name.L, "c3_index"), IsFalse)
}

ctx := s.s.(sessionctx.Context)
idx := tables.NewIndex(t.Meta(), c3IdxInfo)
checkDelRangeDone(c, ctx, idx)

s.mustExec(c, "drop table t1")
ddl.ReorgWaitTimeout = oldReorgWaitTimeout
}
Expand Down Expand Up @@ -713,6 +731,11 @@ LOOP:
c.Assert(nidx, IsNil)

idx := tables.NewIndex(t.Meta(), c3idx.Meta())
checkDelRangeDone(c, ctx, idx)
s.tk.MustExec("drop table test_drop_index")
}

func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) {
f := func() map[int64]struct{} {
handles := make(map[int64]struct{})

Expand Down Expand Up @@ -745,8 +768,6 @@ LOOP:
}
}
c.Assert(handles, HasLen, 0)

s.tk.MustExec("drop table test_drop_index")
}

func (s *testDBSuite) TestAddIndexWithDupCols(c *C) {
Expand Down
37 changes: 26 additions & 11 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,36 @@ func (d *ddl) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error {
return errors.Trace(t.UpdateDDLJob(0, job, updateRawArgs))
}

func (d *ddl) deleteRange(job *model.Job) error {
var err error
if job.Version <= currentVersion {
err = d.delRangeManager.addDelRangeJob(job)
} else {
err = errInvalidJobVersion.GenByArgs(job.Version, currentVersion)
}
return errors.Trace(err)
}

// finishDDLJob deletes the finished DDL job in the ddl queue and puts it to history queue.
// If the DDL job need to handle in background, it will prepare a background job.
func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
startTime := time.Now()
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()

switch job.Type {
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex:
if job.Version <= currentVersion {
err = d.delRangeManager.addDelRangeJob(job)
} else {
err = errInvalidJobVersion.GenByArgs(job.Version, currentVersion)
}
if err != nil {
return errors.Trace(err)
case model.ActionAddIndex:
if job.State != model.JobStateRollbackDone {
break
}
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
err = d.deleteRange(job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex:
err = d.deleteRange(job)
}
if err != nil {
return errors.Trace(err)
}

_, err = t.DeQueueDDLJob()
Expand Down Expand Up @@ -254,9 +267,9 @@ func (d *ddl) handleDDLJobQueue() error {
d.hookMu.Unlock()

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
if job.State == model.JobStateRunning || job.State == model.JobStateDone {
if job.IsRunning() || job.IsRollingback() || job.IsDone() {
d.waitSchemaChanged(nil, waitTime, schemaVer)
}
if job.IsSynced() {
Expand Down Expand Up @@ -329,6 +342,8 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64, err error) {
ver, err = d.onSetDefaultValue(t, job)
case model.ActionShardRowID:
ver, err = d.onShardRowID(t, job)
case model.ActionModifyTableComment:
ver, err = d.onModifyTableComment(t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down Expand Up @@ -417,7 +432,7 @@ func (d *ddl) waitSchemaChanged(ctx context.Context, waitTime time.Duration, lat
// So here we get the latest schema version to make sure all servers' schema version update to the latest schema version
// in a cluster, or to wait for 2 * lease time.
func (d *ddl) waitSchemaSynced(job *model.Job, waitTime time.Duration) {
if !job.IsRunning() && !job.IsDone() {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() {
return
}
// TODO: Make ctx exits when the d is close.
Expand Down
7 changes: 7 additions & 0 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error
startKey := tablecodec.EncodeTablePrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
return doInsert(s, job.ID, tableID, startKey, endKey, now)
// ActionAddIndex needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex:
tableID := job.TableID
indexID := job.Args[1].(int64)
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
return doInsert(s, job.ID, indexID, startKey, endKey, now)
case model.ActionDropIndex:
tableID := job.TableID
var indexName interface{}
Expand Down
21 changes: 21 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,27 @@ func (d *ddl) onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, nil
}

func (d *ddl) onModifyTableComment(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var comment string
if err := job.DecodeArgs(&comment); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

tblInfo.Comment = comment
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

func checkTableNotExists(t *meta.Meta, job *model.Job, schemaID int64, tableName string) error {
// Check this table's database.
tables, err := t.ListTables(schemaID)
Expand Down
2 changes: 1 addition & 1 deletion ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func LoadDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []DelRan
chk := rs.NewChunk()
it := chunk.NewIterator4Chunk(chk)
for {
err = rs.NextChunk(context.TODO(), chk)
err = rs.Next(context.TODO(), chk)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
27 changes: 4 additions & 23 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,10 @@ var (

// SelectResult is an iterator of coprocessor partial results.
type SelectResult interface {
// Next gets the next partial result.
Next(context.Context) (PartialResult, error)
// NextRaw gets the next raw result.
NextRaw(context.Context) ([]byte, error)
// NextChunk reads the data into chunk.
NextChunk(context.Context, *chunk.Chunk) error
// Next reads the data into chunk.
Next(context.Context, *chunk.Chunk) error
// Close closes the iterator.
Close() error
// Fetch fetches partial results from client.
Expand Down Expand Up @@ -121,23 +119,6 @@ func (r *selectResult) fetch(ctx context.Context) {
}
}

// Next returns the next row.
func (r *selectResult) Next(ctx context.Context) (PartialResult, error) {
re := <-r.results
if re.err != nil {
return nil, errors.Trace(re.err)
}
if re.result == nil {
return nil, nil
}
pr := &partialResult{}
pr.rowLen = r.rowLen
err := pr.unmarshal(re.result.GetData())
r.feedback.Update(re.result.GetStartKey(), pr.resp.OutputCounts)
r.partialCount++
return pr, errors.Trace(err)
}

// NextRaw returns the next raw partial result.
func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) {
re := <-r.results
Expand All @@ -149,8 +130,8 @@ func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) {
return re.result.GetData(), nil
}

// NextChunk reads data to the chunk.
func (r *selectResult) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next reads data to the chunk.
func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
for chk.NumRows() < r.ctx.GetSessionVars().MaxChunkSize {
if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) {
Expand Down
12 changes: 6 additions & 6 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *testSuite) TestSelectNormal(c *C) {
colTypes = append(colTypes, colTypes[0])
colTypes = append(colTypes, colTypes[0])

// Test NextChunk.
// Test Next.
response, err := Select(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
c.Assert(err, IsNil)
result, ok := response.(*selectResult)
Expand All @@ -64,11 +64,11 @@ func (s *testSuite) TestSelectNormal(c *C) {

response.Fetch(context.TODO())

// Test NextChunk.
// Test Next.
chk := chunk.NewChunk(colTypes)
numAllRows := 0
for {
err = response.NextChunk(context.TODO(), chk)
err = response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
numAllRows += chk.NumRows()
if chk.NumRows() == 0 {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (s *testSuite) TestSelectStreaming(c *C) {

s.sctx.GetSessionVars().EnableStreaming = true

// Test NextChunk.
// Test Next.
response, err := Select(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
c.Assert(err, IsNil)
result, ok := response.(*streamResult)
Expand All @@ -117,11 +117,11 @@ func (s *testSuite) TestSelectStreaming(c *C) {

response.Fetch(context.TODO())

// Test NextChunk.
// Test Next.
chk := chunk.NewChunk(colTypes)
numAllRows := 0
for {
err = response.NextChunk(context.TODO(), chk)
err = response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
numAllRows += chk.NumRows()
if chk.NumRows() == 0 {
Expand Down
Loading

0 comments on commit 05d88e4

Please sign in to comment.