Skip to content

Commit

Permalink
sync to 1.1: compatible backwards, the old storage usage reqeust meth…
Browse files Browse the repository at this point in the history
…od (#14452)

compatible backward, the old storage usage request method.

two changes in the new version:

all the RPC method codes have been changed.
unable to handle the response of the old version tn.
this fix adds a retry with the old method and handler when a not supported error occurs.

Approved by: @XuPeng-SH, @qingxinhome, @sukki37
  • Loading branch information
gouhongshen authored Jan 30, 2024
1 parent 8f8ac5a commit 2a1c5ac
Show file tree
Hide file tree
Showing 5 changed files with 946 additions and 105 deletions.
116 changes: 99 additions & 17 deletions pkg/frontend/show_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@ import (
"strings"
"time"

v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function/ctl"
"github.com/matrixorigin/matrixone/pkg/txn/client"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

Expand Down Expand Up @@ -132,7 +133,7 @@ func getSqlForTableStats(accountId int32) string {
return fmt.Sprintf(getTableStatsFormatV2, catalog.SystemPartitionRel, accountId)
}

func requestStorageUsage(ses *Session, accIds [][]int32) (resp any, err error) {
func requestStorageUsage(ses *Session, accIds [][]int32) (resp any, tried bool, err error) {
whichTN := func(string) ([]uint64, error) { return nil, nil }
payload := func(tnShardID uint64, parameter string, proc *process.Process) ([]byte, error) {
req := db.StorageUsageReq{}
Expand All @@ -154,7 +155,7 @@ func requestStorageUsage(ses *Session, accIds [][]int32) (resp any, err error) {
var ctx context.Context
var txnOperator client.TxnOperator
if ctx, txnOperator, err = ses.txnHandler.GetTxn(); err != nil {
return nil, err
return nil, false, err
}

// create a new proc for `handler`
Expand All @@ -168,12 +169,77 @@ func requestStorageUsage(ses *Session, accIds [][]int32) (resp any, err error) {
handler := ctl.GetTNHandlerFunc(api.OpCode_OpStorageUsage, whichTN, payload, responseUnmarshaler)
result, err := handler(proc, "DN", "", ctl.MoCtlTNCmdSender)
if moerr.IsMoErrCode(err, moerr.ErrNotSupported) {
return nil, moerr.NewNotSupportedNoCtx("current tn version not supported `show accounts`")
} else if err != nil {
return nil, err
// try the previous RPC method
payload_V0 := func(tnShardID uint64, parameter string, proc *process.Process) ([]byte, error) { return nil, nil }
responseUnmarshaler_V0 := func(payload []byte) (interface{}, error) {
usage := &db.StorageUsageResp_V0{}
if err := usage.Unmarshal(payload); err != nil {
return nil, err
}
return usage, nil
}

tried = true
CmdMethod_StorageUsage := api.OpCode(14)
handler = ctl.GetTNHandlerFunc(CmdMethod_StorageUsage, whichTN, payload_V0, responseUnmarshaler_V0)
result, err = handler(proc, "DN", "", ctl.MoCtlTNCmdSender)

if moerr.IsMoErrCode(err, moerr.ErrNotSupported) {
return nil, tried, moerr.NewNotSupportedNoCtx("current tn version not supported `show accounts`")
}
}

if err != nil {
return nil, tried, err
}

return result.Data.([]any)[0], tried, nil
}

func handleStorageUsageResponse_V0(ctx context.Context, fs fileservice.FileService,
usage *db.StorageUsageResp_V0) (map[int32]uint64, error) {
result := make(map[int32]uint64, 0)
for idx := range usage.CkpEntries {
version := usage.CkpEntries[idx].Version
location := usage.CkpEntries[idx].Location

// storage usage was introduced after `CheckpointVersion9`
if version < logtail.CheckpointVersion9 {
// exist old version checkpoint which hasn't storage usage data in it,
// to avoid inaccurate info leading misunderstand, we chose to return empty result
logutil.Info("[storage usage]: found older ckp when handle storage usage response")
return map[int32]uint64{}, nil
}

ckpData, err := logtail.LoadSpecifiedCkpBatch(ctx, location, version, logtail.StorageUsageInsIDX, fs)
if err != nil {
return nil, err
}

storageUsageBat := ckpData.GetBatches()[logtail.StorageUsageInsIDX]
accIDVec := vector.MustFixedCol[uint64](
storageUsageBat.GetVectorByName(catalog.SystemColAttr_AccID).GetDownstreamVector(),
)
sizeVec := vector.MustFixedCol[uint64](
storageUsageBat.GetVectorByName(logtail.CheckpointMetaAttr_ObjectSize).GetDownstreamVector(),
)

size := uint64(0)
length := len(accIDVec)
for i := 0; i < length; i++ {
result[int32(accIDVec[i])] += sizeVec[i]
size += sizeVec[i]
}

ckpData.Close()
}

// [account_id, db_id, table_id, obj_id, table_total_size]
for _, info := range usage.BlockEntries {
result[int32(info.Info[0])] += info.Info[3]
}

return result.Data.([]any)[0], nil
return result, nil
}

func handleStorageUsageResponse(
Expand Down Expand Up @@ -249,20 +315,36 @@ func getAccountsStorageUsage(ctx context.Context, ses *Session, accIds [][]int32
}

// step 2: query to tn
response, err := requestStorageUsage(ses, accIds)
response, tried, err := requestStorageUsage(ses, accIds)
if err != nil {
return nil, err
}

usage, ok := response.(*db.StorageUsageResp)
if !ok || usage.Magic != logtail.StorageUsageMagic {
return nil, moerr.NewNotSupportedNoCtx("cn version newer than tn, retry later")
}
if tried {
usage, ok := response.(*db.StorageUsageResp_V0)
if !ok {
return nil, moerr.NewInternalErrorNoCtx("storage usage response decode failed, retry later")
}

fs, err := fileservice.Get[fileservice.FileService](ses.GetParameterUnit().FileService, defines.SharedFileServiceName)
if err != nil {
return nil, err
}

updateStorageUsageCache(usage.AccIds, usage.Sizes)
// step 3: handling these pulled data
return handleStorageUsageResponse_V0(ctx, fs, usage)

// step 3: handling these pulled data
return handleStorageUsageResponse(ctx, usage)
} else {
usage, ok := response.(*db.StorageUsageResp)
if !ok || usage.Magic != logtail.StorageUsageMagic {
return nil, moerr.NewInternalErrorNoCtx("storage usage response decode failed, retry later")
}

updateStorageUsageCache(usage.AccIds, usage.Sizes)

// step 3: handling these pulled data
return handleStorageUsageResponse(ctx, usage)
}
}

func embeddingSizeToBatch(ori *batch.Batch, size uint64, mp *mpool.MPool) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/vm/engine/tae/db/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,37 @@ func (s *StorageUsageReq) UnmarshalBinary(data []byte) error {
return s.Unmarshal(data)
}

type BlockMetaInfo struct {
Info []uint64
}

func (b *BlockMetaInfo) MarshalBinary() ([]byte, error) {
return b.Marshal()
}

func (b *BlockMetaInfo) UnmarshalBinary(data []byte) error {
return b.Unmarshal(data)
}

type CkpMetaInfo struct {
Version uint32
Location []byte
}

func (c *CkpMetaInfo) MarshalBinary() ([]byte, error) {
return c.Marshal()
}

func (c *CkpMetaInfo) UnmarshalBinary(data []byte) error {
return c.Unmarshal(data)
}

type StorageUsageResp_V0 struct {
Succeed bool
CkpEntries []*CkpMetaInfo
BlockEntries []*BlockMetaInfo
}

type StorageUsageResp struct {
Succeed bool
AccIds []int32
Expand Down
Loading

0 comments on commit 2a1c5ac

Please sign in to comment.