Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add table feature flag #21346

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions pkg/partitionservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,16 @@ func (s *service) Delete(
return err
}

func (s *service) Is(
func (s *service) GetPartitionMetadata(
ctx context.Context,
tableID uint64,
txnOp client.TxnOperator,
) (bool, partition.PartitionMetadata, error) {
) (partition.PartitionMetadata, error) {
if !s.cfg.Enable {
return false, partition.PartitionMetadata{}, nil
return partition.PartitionMetadata{}, nil
}

metadata, err := s.readMetadata(ctx, tableID, txnOp)
if err != nil {
return false, partition.PartitionMetadata{}, err
}
return !metadata.IsEmpty(), metadata, nil
return s.readMetadata(ctx, tableID, txnOp)
}

func (s *service) getMetadata(
Expand Down
36 changes: 0 additions & 36 deletions pkg/partitionservice/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,42 +53,6 @@ func TestDelete(t *testing.T) {
require.NotEmpty(t, s.mu.tables)

require.NoError(t, s.Delete(ctx, tableID, nil))

ok, metadata, err := s.Is(ctx, tableID, nil)
require.NoError(t, err)
require.False(t, ok)
require.True(t, metadata.IsEmpty())
},
)
}

func TestIs(t *testing.T) {
num := uint64(2)
tableID := uint64(1)
columns := []string{"a"}

runTestPartitionServiceTest(
func(
ctx context.Context,
txnOp client.TxnOperator,
s *service,
store *memStorage,
) {
def := newTestTableDefine(1, columns, []types.T{types.T_int8})
store.addUncommittedTable(def)

stmt := newTestHashOption(t, columns[0], num)
assert.NoError(t, s.Create(ctx, tableID, stmt, txnOp))

ok, metadata, err := s.Is(ctx, tableID, txnOp)
require.NoError(t, err)
require.True(t, ok)
require.True(t, !metadata.IsEmpty())

ok, metadata, err = s.Is(ctx, tableID+1, txnOp)
require.NoError(t, err)
require.False(t, ok)
require.True(t, metadata.IsEmpty())
},
)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/partitionservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ var (

// PartitionService is used to maintaining the metadata of the partition table.
type PartitionService interface {
Is(
ctx context.Context,
tableID uint64,
txnOp client.TxnOperator,
) (bool, partition.PartitionMetadata, error)

// Create creates metadata of the partition table.
Create(
ctx context.Context,
Expand All @@ -74,6 +68,12 @@ type PartitionService interface {
txnOp client.TxnOperator,
) error

GetPartitionMetadata(
ctx context.Context,
tableID uint64,
txnOp client.TxnOperator,
) (partition.PartitionMetadata, error)

Prune(
ctx context.Context,
tableID uint64,
Expand Down
348 changes: 192 additions & 156 deletions pkg/pb/api/api.pb.go

Large diffs are not rendered by default.

1,498 changes: 746 additions & 752 deletions pkg/pb/plan/plan.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -3337,7 +3337,7 @@ func (c *Compile) compilePreInsertSK(n *plan.Node, ss []*Scope) []*Scope {

func (c *Compile) compileDelete(n *plan.Node, ss []*Scope) ([]*Scope, error) {
currentFirstFlag := c.anal.isFirst
op, err := constructDeletion(n, c.e, c.proc)
op, err := constructDeletion(n, c.e)
if err != nil {
return nil, err
}
Expand Down
60 changes: 40 additions & 20 deletions pkg/sql/compile/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/shardservice"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/lockop"
"github.com/matrixorigin/matrixone/pkg/sql/features"
"github.com/matrixorigin/matrixone/pkg/sql/parsers"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
Expand Down Expand Up @@ -1439,24 +1440,26 @@ func (s *Scope) CreateTable(c *Compile) error {
}
}

if qry.IsPartition {
// cannot has err.
stmt, _ := parsers.ParseOne(
c.proc.Ctx,
dialect.MYSQL,
qry.RawSQL,
c.getLower(),
)
if !features.IsPartitioned(qry.TableDef.FeatureFlag) {
return nil
}

err = partitionservice.GetService(c.proc.GetService()).Create(
c.proc.Ctx,
qry.GetTableDef().TblId,
stmt.(*tree.CreateTable),
c.proc.GetTxnOperator(),
)
if err != nil {
return err
}
// cannot has err.
stmt, _ := parsers.ParseOne(
c.proc.Ctx,
dialect.MYSQL,
qry.RawSQL,
c.getLower(),
)

err = partitionservice.GetService(c.proc.GetService()).Create(
c.proc.Ctx,
qry.TableDef.TblId,
stmt.(*tree.CreateTable),
c.proc.GetTxnOperator(),
)
if err != nil {
return err
}

return shardservice.GetService(c.proc.GetService()).Create(
Expand Down Expand Up @@ -2658,6 +2661,7 @@ func (s *Scope) DropTable(c *Compile) error {
var planDefsToExeDefs = func(tableDef *plan.TableDef) ([]engine.TableDef, error) {
planDefs := tableDef.GetDefs()
var exeDefs []engine.TableDef
var propDef *engine.PropertiesDef
c := new(engine.ConstraintDef)
for _, def := range planDefs {
switch defVal := def.GetDef().(type) {
Expand All @@ -2669,15 +2673,31 @@ var planDefsToExeDefs = func(tableDef *plan.TableDef) ([]engine.TableDef, error)
Value: p.GetValue(),
}
}
exeDefs = append(exeDefs, &engine.PropertiesDef{
Properties: properties,
})
propDef = &engine.PropertiesDef{Properties: properties}
exeDefs = append(exeDefs, propDef)
c.Cts = append(c.Cts, &engine.StreamConfigsDef{
Configs: defVal.Properties.GetProperties(),
})
}
}

if propDef == nil {
propDef = &engine.PropertiesDef{Properties: make([]engine.Property, 0)}
}
propDef.Properties = append(
propDef.Properties,
engine.Property{
Key: catalog.PropSchemaExtra,
Value: string(
api.MustMarshalTblExtra(
&api.SchemaExtra{
FeatureFlag: tableDef.FeatureFlag,
},
),
),
},
)

if tableDef.Indexes != nil {
c.Cts = append(c.Cts, &engine.IndexDef{
Indexes: tableDef.Indexes,
Expand Down
44 changes: 5 additions & 39 deletions pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/colexec/unionall"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/value_scan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/window"
"github.com/matrixorigin/matrixone/pkg/sql/features"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
Expand Down Expand Up @@ -660,7 +661,7 @@ func constructRestrict(n *plan.Node, filterExpr *plan.Expr) *filter.Filter {
return op
}

func constructDeletion(n *plan.Node, eg engine.Engine, proc *process.Process) (vm.Operator, error) {
func constructDeletion(n *plan.Node, eg engine.Engine) (vm.Operator, error) {
oldCtx := n.DeleteCtx
delCtx := &deletion.DeleteCtx{
Ref: oldCtx.Ref,
Expand All @@ -674,20 +675,9 @@ func constructDeletion(n *plan.Node, eg engine.Engine, proc *process.Process) (v
op := deletion.NewArgument()
op.DeleteCtx = delCtx

ps := proc.GetPartitionService()
ok, _, err := ps.Is(
proc.Ctx,
oldCtx.TableDef.TblId,
proc.GetTxnOperator(),
)
if err != nil {
return nil, err
}

if !ok {
if !features.IsPartitioned(oldCtx.TableDef.FeatureFlag) {
return op, nil
}

return deletion.NewPartitionDelete(op, oldCtx.TableDef.TblId), nil
}

Expand Down Expand Up @@ -875,17 +865,7 @@ func constructMultiUpdate(
}
arg.Action = action

ps := proc.GetPartitionService()
ok, _, err := ps.Is(
proc.Ctx,
n.UpdateCtxList[0].TableDef.TblId,
proc.GetTxnOperator(),
)
if err != nil {
return nil, err
}

if !ok {
if !features.IsPartitioned(n.UpdateCtxList[0].TableDef.FeatureFlag) {
return arg, nil
}

Expand Down Expand Up @@ -919,21 +899,7 @@ func constructInsert(
arg.InsertCtx = newCtx
arg.ToWriteS3 = toS3

ps := proc.GetPartitionService()
if ps == nil {
return arg, nil
}

ok, _, err := ps.Is(
proc.Ctx,
oldCtx.TableDef.TblId,
proc.GetTxnOperator(),
)
if err != nil {
return nil, err
}

if !ok {
if !features.IsPartitioned(oldCtx.TableDef.FeatureFlag) {
return arg, nil
}

Expand Down
Loading
Loading