Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry-pick] add columns to mo_pubs/subs #21059

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions pkg/bootstrap/versions/v2_0_2/cluster_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (

var clusterUpgEntries = []versions.UpgradeEntry{
upg_mo_cdc_watermark,
upg_mo_pubs_add_account_name,
upg_mo_subs_add_sub_account_name,
upg_mo_subs_add_pub_account_id,
}

var upg_mo_cdc_watermark = versions.UpgradeEntry{
Expand All @@ -37,3 +40,48 @@ var upg_mo_cdc_watermark = versions.UpgradeEntry{
return !colInfo.IsExits, nil
},
}

var upg_mo_pubs_add_account_name = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_PUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_pubs add column account_name varchar(300) after account_id",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_PUBS, "account_name")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_pubs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.account_id = t2.account_id SET t1.account_name = t2.account_name",
}

var upg_mo_subs_add_sub_account_name = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_SUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_subs add column sub_account_name VARCHAR(300) NOT NULL after sub_account_id",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_SUBS, "sub_account_name")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_subs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.sub_account_id = t2.account_id SET t1.sub_account_name = t2.account_name",
}

var upg_mo_subs_add_pub_account_id = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_SUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_subs add column pub_account_id INT NOT NULL after sub_time",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_SUBS, "pub_account_id")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_subs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.pub_account_name = t2.account_name SET t1.pub_account_id = t2.account_id",
}
2 changes: 2 additions & 0 deletions pkg/common/pubsub/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ func (pubInfo *PubInfo) GetCreateSql() string {

type SubInfo struct {
SubAccountId int32
SubAccountName string
SubName string
SubTime string
PubAccountId int32
PubAccountName string
PubName string
PubDbName string
Expand Down
55 changes: 37 additions & 18 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3613,24 +3613,46 @@ func doDropAccount(ctx context.Context, bh BackgroundExec, ses *Session, da *dro
return moerr.NewInternalErrorf(ctx, "can not delete the account %s", da.Name)
}

checkAccount := func(accountName string) (accountId int64, version uint64, ok bool, err error) {
if sql, err = getSqlForCheckTenant(ctx, da.Name); err != nil {
return
}

bh.ClearExecResultSet()
if err = bh.Exec(ctx, sql); err != nil {
return
}

if erArray, err = getResultSet(ctx, bh); err != nil {
return
}

if execResultArrayHasData(erArray) {
if accountId, err = erArray[0].GetInt64(ctx, 0, 0); err != nil {
return
}
if version, err = erArray[0].GetUint64(ctx, 0, 3); err != nil {
return
}
ok = true
}

return
}

dropAccountFunc := func() (rtnErr error) {
ses.Infof(ctx, "dropAccount %s sql: %s", da.Name, getAccountIdNamesSql)
_, nameInfoMap, rtnErr := getAccounts(ctx, bh, true)
if rtnErr != nil {
return rtnErr
if accountId, version, hasAccount, rtnErr = checkAccount(da.Name); rtnErr != nil {
return
}

//check the account exists or not
if _, ok := nameInfoMap[da.Name]; !ok {
//no such account
if !da.IfExists { //when the "IF EXISTS" is set, just skip it.
// check the account exists or not
if !hasAccount {
// when the "IF EXISTS" is set, just skip it.
if !da.IfExists {
rtnErr = moerr.NewInternalErrorf(ctx, "there is no account %s", da.Name)
}
hasAccount = false
return
}
accountId = int64(nameInfoMap[da.Name].Id)
version = nameInfoMap[da.Name].Version

//drop tables of the tenant
//NOTE!!!: single DDL drop statement per single transaction
Expand Down Expand Up @@ -3661,7 +3683,7 @@ func doDropAccount(ctx context.Context, bh BackgroundExec, ses *Session, da *dro
}
for _, pubInfo := range pubInfos {
ses.Infof(ctx, "dropAccount %s sql: %s", da.Name, pubInfo.PubName)
if rtnErr = dropPublication(deleteCtx, bh, true, pubInfo.PubName); rtnErr != nil {
if rtnErr = dropPublication(deleteCtx, bh, true, pubInfo.PubAccountName, pubInfo.PubName); rtnErr != nil {
return
}
}
Expand Down Expand Up @@ -3721,13 +3743,8 @@ func doDropAccount(ctx context.Context, bh BackgroundExec, ses *Session, da *dro
return rtnErr
}
for _, subInfo := range subInfos {
pubAccInfo, ok := nameInfoMap[subInfo.PubAccountName]
if !ok {
continue
}

ses.Infof(ctx, "dropAccount %s sql: %s %s", da.Name, updatePubInfoAccountListFormat, subInfo.PubName)
if rtnErr = dropSubAccountNameInSubAccounts(deleteCtx, bh, pubAccInfo.Id, subInfo.PubName, da.Name); rtnErr != nil {
if rtnErr = dropSubAccountNameInSubAccounts(deleteCtx, bh, subInfo.PubAccountId, subInfo.PubName, da.Name); rtnErr != nil {
return rtnErr
}
}
Expand Down Expand Up @@ -7691,6 +7708,8 @@ func createSubscription(ctx context.Context, bh BackgroundExec, newTenant *Tenan
for _, pubInfo := range pubInfos {
subInfo := &pubsub.SubInfo{
SubAccountId: int32(newTenant.TenantID),
SubAccountName: newTenant.Tenant,
PubAccountId: accountId,
PubAccountName: accIdInfoMap[accountId].Name,
PubName: pubInfo.PubName,
PubDbName: pubInfo.DbName,
Expand Down
5 changes: 2 additions & 3 deletions pkg/frontend/authenticate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7126,9 +7126,8 @@ func Test_doDropAccount(t *testing.T) {
bh.sql2result["commit;"] = nil
bh.sql2result["rollback;"] = nil

sql := getAccountIdNamesSql + " for update"
sql, _ := getSqlForCheckTenant(ctx, "acc")
mrs := newMrsForGetAllAccounts([][]interface{}{
{uint64(0), "sys", "open", uint64(1), nil},
{uint64(1), "acc", "open", uint64(1), nil},
})
bh.sql2result[sql] = mrs
Expand Down Expand Up @@ -7187,7 +7186,7 @@ func Test_doDropAccount(t *testing.T) {
bh.sql2result["commit;"] = nil
bh.sql2result["rollback;"] = nil

sql := getAccountIdNamesSql + " for update"
sql, _ := getSqlForCheckTenant(ctx, "acc")
bh.sql2result[sql] = newMrsForGetAllAccounts([][]interface{}{})

sql, _ = getSqlForDeleteAccountFromMoAccount(context.TODO(), mustUnboxExprStr(stmt.Name))
Expand Down
3 changes: 3 additions & 0 deletions pkg/frontend/predefined.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ var (

MoCatalogMoPubsDDL = `create table mo_catalog.mo_pubs (
account_id int not null,
account_name varchar(300),
pub_name varchar(64),
database_name varchar(5000),
database_id bigint unsigned,
Expand All @@ -179,8 +180,10 @@ var (

MoCatalogMoSubsDDL = `create table mo_catalog.mo_subs (
sub_account_id INT NOT NULL,
sub_account_name VARCHAR(300) NOT NULL,
sub_name VARCHAR(5000) DEFAULT NULL,
sub_time TIMESTAMP DEFAULT NULL,
pub_account_id INT NOT NULL,
pub_account_name VARCHAR(300) NOT NULL,
pub_name VARCHAR(64) NOT NULL,
pub_database VARCHAR(5000) NOT NULL,
Expand Down
Loading
Loading