Skip to content

Commit

Permalink
[Cherry-pick] add columns to mo_pubs/subs (#21059)
Browse files Browse the repository at this point in the history
add columns to mo_pubs/subs

Approved by: @daviszhen, @heni02, @zhangxu19830126
  • Loading branch information
ck89119 authored Jan 3, 2025
1 parent 46408f4 commit a9be293
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 212 deletions.
48 changes: 48 additions & 0 deletions pkg/bootstrap/versions/v2_0_2/cluster_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (

var clusterUpgEntries = []versions.UpgradeEntry{
upg_mo_cdc_watermark,
upg_mo_pubs_add_account_name,
upg_mo_subs_add_sub_account_name,
upg_mo_subs_add_pub_account_id,
}

var upg_mo_cdc_watermark = versions.UpgradeEntry{
Expand All @@ -37,3 +40,48 @@ var upg_mo_cdc_watermark = versions.UpgradeEntry{
return !colInfo.IsExits, nil
},
}

var upg_mo_pubs_add_account_name = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_PUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_pubs add column account_name varchar(300) after account_id",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_PUBS, "account_name")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_pubs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.account_id = t2.account_id SET t1.account_name = t2.account_name",
}

var upg_mo_subs_add_sub_account_name = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_SUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_subs add column sub_account_name VARCHAR(300) NOT NULL after sub_account_id",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_SUBS, "sub_account_name")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_subs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.sub_account_id = t2.account_id SET t1.sub_account_name = t2.account_name",
}

var upg_mo_subs_add_pub_account_id = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_SUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_subs add column pub_account_id INT NOT NULL after sub_time",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_SUBS, "pub_account_id")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_subs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.pub_account_name = t2.account_name SET t1.pub_account_id = t2.account_id",
}
2 changes: 2 additions & 0 deletions pkg/common/pubsub/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ func (pubInfo *PubInfo) GetCreateSql() string {

type SubInfo struct {
SubAccountId int32
SubAccountName string
SubName string
SubTime string
PubAccountId int32
PubAccountName string
PubName string
PubDbName string
Expand Down
55 changes: 37 additions & 18 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3613,24 +3613,46 @@ func doDropAccount(ctx context.Context, bh BackgroundExec, ses *Session, da *dro
return moerr.NewInternalErrorf(ctx, "can not delete the account %s", da.Name)
}

checkAccount := func(accountName string) (accountId int64, version uint64, ok bool, err error) {
if sql, err = getSqlForCheckTenant(ctx, da.Name); err != nil {
return
}

bh.ClearExecResultSet()
if err = bh.Exec(ctx, sql); err != nil {
return
}

if erArray, err = getResultSet(ctx, bh); err != nil {
return
}

if execResultArrayHasData(erArray) {
if accountId, err = erArray[0].GetInt64(ctx, 0, 0); err != nil {
return
}
if version, err = erArray[0].GetUint64(ctx, 0, 3); err != nil {
return
}
ok = true
}

return
}

dropAccountFunc := func() (rtnErr error) {
ses.Infof(ctx, "dropAccount %s sql: %s", da.Name, getAccountIdNamesSql)
_, nameInfoMap, rtnErr := getAccounts(ctx, bh, true)
if rtnErr != nil {
return rtnErr
if accountId, version, hasAccount, rtnErr = checkAccount(da.Name); rtnErr != nil {
return
}

//check the account exists or not
if _, ok := nameInfoMap[da.Name]; !ok {
//no such account
if !da.IfExists { //when the "IF EXISTS" is set, just skip it.
// check the account exists or not
if !hasAccount {
// when the "IF EXISTS" is set, just skip it.
if !da.IfExists {
rtnErr = moerr.NewInternalErrorf(ctx, "there is no account %s", da.Name)
}
hasAccount = false
return
}
accountId = int64(nameInfoMap[da.Name].Id)
version = nameInfoMap[da.Name].Version

//drop tables of the tenant
//NOTE!!!: single DDL drop statement per single transaction
Expand Down Expand Up @@ -3661,7 +3683,7 @@ func doDropAccount(ctx context.Context, bh BackgroundExec, ses *Session, da *dro
}
for _, pubInfo := range pubInfos {
ses.Infof(ctx, "dropAccount %s sql: %s", da.Name, pubInfo.PubName)
if rtnErr = dropPublication(deleteCtx, bh, true, pubInfo.PubName); rtnErr != nil {
if rtnErr = dropPublication(deleteCtx, bh, true, pubInfo.PubAccountName, pubInfo.PubName); rtnErr != nil {
return
}
}
Expand Down Expand Up @@ -3721,13 +3743,8 @@ func doDropAccount(ctx context.Context, bh BackgroundExec, ses *Session, da *dro
return rtnErr
}
for _, subInfo := range subInfos {
pubAccInfo, ok := nameInfoMap[subInfo.PubAccountName]
if !ok {
continue
}

ses.Infof(ctx, "dropAccount %s sql: %s %s", da.Name, updatePubInfoAccountListFormat, subInfo.PubName)
if rtnErr = dropSubAccountNameInSubAccounts(deleteCtx, bh, pubAccInfo.Id, subInfo.PubName, da.Name); rtnErr != nil {
if rtnErr = dropSubAccountNameInSubAccounts(deleteCtx, bh, subInfo.PubAccountId, subInfo.PubName, da.Name); rtnErr != nil {
return rtnErr
}
}
Expand Down Expand Up @@ -7691,6 +7708,8 @@ func createSubscription(ctx context.Context, bh BackgroundExec, newTenant *Tenan
for _, pubInfo := range pubInfos {
subInfo := &pubsub.SubInfo{
SubAccountId: int32(newTenant.TenantID),
SubAccountName: newTenant.Tenant,
PubAccountId: accountId,
PubAccountName: accIdInfoMap[accountId].Name,
PubName: pubInfo.PubName,
PubDbName: pubInfo.DbName,
Expand Down
5 changes: 2 additions & 3 deletions pkg/frontend/authenticate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7126,9 +7126,8 @@ func Test_doDropAccount(t *testing.T) {
bh.sql2result["commit;"] = nil
bh.sql2result["rollback;"] = nil

sql := getAccountIdNamesSql + " for update"
sql, _ := getSqlForCheckTenant(ctx, "acc")
mrs := newMrsForGetAllAccounts([][]interface{}{
{uint64(0), "sys", "open", uint64(1), nil},
{uint64(1), "acc", "open", uint64(1), nil},
})
bh.sql2result[sql] = mrs
Expand Down Expand Up @@ -7187,7 +7186,7 @@ func Test_doDropAccount(t *testing.T) {
bh.sql2result["commit;"] = nil
bh.sql2result["rollback;"] = nil

sql := getAccountIdNamesSql + " for update"
sql, _ := getSqlForCheckTenant(ctx, "acc")
bh.sql2result[sql] = newMrsForGetAllAccounts([][]interface{}{})

sql, _ = getSqlForDeleteAccountFromMoAccount(context.TODO(), mustUnboxExprStr(stmt.Name))
Expand Down
3 changes: 3 additions & 0 deletions pkg/frontend/predefined.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ var (

MoCatalogMoPubsDDL = `create table mo_catalog.mo_pubs (
account_id int not null,
account_name varchar(300),
pub_name varchar(64),
database_name varchar(5000),
database_id bigint unsigned,
Expand All @@ -179,8 +180,10 @@ var (

MoCatalogMoSubsDDL = `create table mo_catalog.mo_subs (
sub_account_id INT NOT NULL,
sub_account_name VARCHAR(300) NOT NULL,
sub_name VARCHAR(5000) DEFAULT NULL,
sub_time TIMESTAMP DEFAULT NULL,
pub_account_id INT NOT NULL,
pub_account_name VARCHAR(300) NOT NULL,
pub_name VARCHAR(64) NOT NULL,
pub_database VARCHAR(5000) NOT NULL,
Expand Down
Loading

0 comments on commit a9be293

Please sign in to comment.