Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate逻辑挪到cmdb小工具中 #7781

Draft
wants to merge 1 commit into
base: v3.13.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions scripts/init_db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,4 @@
#!/bin/bash
set -e

# get local IP.
localIp=`python ip.py`

# 判断是否为IPV6,是则在地址两端加中括号
if [[ ${localIp} =~ ":" ]]
then
localIp="[${localIp}]"
fi
echo "localIp:${localIp}"

curl -X POST -H 'Content-Type:application/json' -H 'BK_USER:migrate' -H 'HTTP_BLUEKING_SUPPLIER_ID:0' http://${localIp}:60004/migrate/v3/migrate/community/0

echo ""
./tool_ctl/tool_ctl migrate db --config-path=cmdb_adminserver/configures/migrate.yaml --enable-auth=false
1 change: 0 additions & 1 deletion src/apimachinery/adminserver/adminserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
type AdminServerClientInterface interface {
ClearDatabase(ctx context.Context, h http.Header) (resp *metadata.Response, err error)
Set(ctx context.Context, ownerID string, h http.Header) (resp *metadata.Response, err error)
Migrate(ctx context.Context, ownerID string, distribution string, h http.Header) error
RunSyncDBIndex(ctx context.Context, h http.Header) error
}

Expand Down
24 changes: 0 additions & 24 deletions src/apimachinery/adminserver/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,30 +50,6 @@ func (a *adminServer) Set(ctx context.Context, ownerID string, h http.Header) (r
return
}

// Migrate TODO
func (a *adminServer) Migrate(ctx context.Context, ownerID string, distribution string, h http.Header) error {
resp := new(metadata.Response)
subPath := "/migrate/%s/%s"

err := a.client.Post().
WithContext(ctx).
Body(nil).
SubResourcef(subPath, distribution, ownerID).
WithHeaders(h).
Do().
Into(resp)

if err != nil {
return errors.CCHttpError
}

if err = resp.CCError(); err != nil {
return err
}

return nil
}

// RunSyncDBIndex TODO
func (a *adminServer) RunSyncDBIndex(ctx context.Context, h http.Header) error {
resp := new(metadata.Response)
Expand Down
275 changes: 0 additions & 275 deletions src/scene_server/admin_server/service/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,275 +17,16 @@ import (
"fmt"
"net/http"
"path/filepath"
"time"

"configcenter/src/common"
"configcenter/src/common/blog"
"configcenter/src/common/mapstr"
"configcenter/src/common/metadata"
"configcenter/src/common/types"
"configcenter/src/common/util"
"configcenter/src/common/version"
"configcenter/src/common/watch"
"configcenter/src/scene_server/admin_server/upgrader"
"configcenter/src/source_controller/cacheservice/event"
daltypes "configcenter/src/storage/dal/types"
streamtypes "configcenter/src/storage/stream/types"

"github.com/emicklei/go-restful/v3"
"go.mongodb.org/mongo-driver/bson"
)

func (s *Service) migrate(req *restful.Request, resp *restful.Response) {
rHeader := req.Request.Header
rid := util.GetHTTPCCRequestID(rHeader)
defErr := s.CCErr.CreateDefaultCCErrorIf(util.GetLanguage(rHeader))
ownerID := common.BKDefaultOwnerID
updateCfg := &upgrader.Config{
OwnerID: ownerID,
User: common.CCSystemOperatorUserName,
}

if err := s.createWatchDBChainCollections(rid); err != nil {
blog.Errorf("create watch db chain collections failed, err: %v, rid: %s", err, rid)
result := &metadata.RespError{
Msg: defErr.Errorf(common.CCErrCommMigrateFailed, err.Error()),
}
resp.WriteError(http.StatusInternalServerError, result)
return
}

preVersion, finishedVersions, err := upgrader.Upgrade(s.ctx, s.db, s.cache, s.iam, updateCfg)
if err != nil {
blog.Errorf("db upgrade failed, err: %+v, rid: %s", err, rid)
result := &metadata.RespError{
Msg: defErr.Errorf(common.CCErrCommMigrateFailed, err.Error()),
}
resp.WriteError(http.StatusInternalServerError, result)
return
}

currentVersion := preVersion
if len(finishedVersions) > 0 {
currentVersion = finishedVersions[len(finishedVersions)-1]
}

result := MigrationResponse{
BaseResp: metadata.BaseResp{
Result: true,
Code: 0,
ErrMsg: "",
Permissions: nil,
},
Data: "migrate success",
PreVersion: preVersion,
CurrentVersion: currentVersion,
FinishedVersions: finishedVersions,
}
resp.WriteEntity(result)
}

// dbChainTTLTime the ttl time seconds of the db event chain, used to set the ttl index of mongodb
const dbChainTTLTime = 5 * 24 * 60 * 60

func (s *Service) createWatchDBChainCollections(rid string) error {
// create watch token table to store the last watch token info for every collections
exists, err := s.watchDB.HasTable(s.ctx, common.BKTableNameWatchToken)
if err != nil {
blog.Errorf("check if table %s exists failed, err: %v, rid: %s", common.BKTableNameWatchToken, err, rid)
return err
}

if !exists {
err = s.watchDB.CreateTable(s.ctx, common.BKTableNameWatchToken)
if err != nil && !s.watchDB.IsDuplicatedError(err) {
blog.Errorf("create table %s failed, err: %v, rid: %s", common.BKTableNameWatchToken, err, rid)
return err
}
}

// create watch chain node table and init the last token info as empty for all collections
cursorTypes := watch.ListCursorTypes()
for _, cursorType := range cursorTypes {
key, err := event.GetResourceKeyWithCursorType(cursorType)
if err != nil {
blog.Errorf("get resource key with cursor type %s failed, err: %v, rid: %s", cursorType, err, rid)
return err
}

exists, err := s.watchDB.HasTable(s.ctx, key.ChainCollection())
if err != nil {
blog.Errorf("check if table %s exists failed, err: %v, rid: %s", key.ChainCollection(), err, rid)
return err
}

if !exists {
err = s.watchDB.CreateTable(s.ctx, key.ChainCollection())
if err != nil && !s.watchDB.IsDuplicatedError(err) {
blog.Errorf("create table %s failed, err: %v, rid: %s", key.ChainCollection(), err, rid)
return err
}
}

if err = s.createWatchIndexes(cursorType, key, rid); err != nil {
return err
}

if err = s.createWatchToken(key); err != nil {
return err
}
}
return nil
}

func (s *Service) createWatchIndexes(cursorType watch.CursorType, key event.Key, rid string) error {
indexes := []daltypes.Index{
{Name: "index_id", Keys: bson.D{{common.BKFieldID, -1}}, Background: true, Unique: true},
{Name: "index_cursor", Keys: bson.D{{common.BKCursorField, -1}}, Background: true, Unique: true},
{Name: "index_cluster_time", Keys: bson.D{{common.BKClusterTimeField, -1}}, Background: true,
ExpireAfterSeconds: dbChainTTLTime},
}

if cursorType == watch.ObjectBase || cursorType == watch.MainlineInstance || cursorType == watch.InstAsst {
subResourceIndex := daltypes.Index{
Name: "index_sub_resource", Keys: bson.D{{common.BKSubResourceField, 1}}, Background: true,
}
indexes = append(indexes, subResourceIndex)
}

existIndexArr, err := s.watchDB.Table(key.ChainCollection()).Indexes(s.ctx)
if err != nil {
blog.Errorf("get exist indexes for table %s failed, err: %v, rid: %s", key.ChainCollection(), err, rid)
return err
}

existIdxMap := make(map[string]bool)
for _, index := range existIndexArr {
existIdxMap[index.Name] = true
}

for _, index := range indexes {
if _, exist := existIdxMap[index.Name]; exist {
continue
}

err = s.watchDB.Table(key.ChainCollection()).CreateIndex(s.ctx, index)
if err != nil && !s.watchDB.IsDuplicatedError(err) {
blog.Errorf("create indexes for table %s failed, err: %v, rid: %s", key.ChainCollection(), err, rid)
return err
}
}
return nil
}

func (s *Service) createWatchToken(key event.Key) error {
filter := map[string]interface{}{
"_id": key.Collection(),
}

count, err := s.watchDB.Table(common.BKTableNameWatchToken).Find(filter).Count(s.ctx)
if err != nil {
blog.Errorf("check if last watch token exists failed, err: %v, filter: %+v", err, filter)
return err
}

if count > 0 {
return nil
}

if key.Collection() == event.HostIdentityKey.Collection() {
// host identity's watch token is different with other identity.
// only set coll is ok, the other fields is useless
data := mapstr.MapStr{
"_id": key.Collection(),
common.BKTableNameBaseHost: watch.LastChainNodeData{Coll: common.BKTableNameBaseHost},
common.BKTableNameModuleHostConfig: watch.LastChainNodeData{Coll: common.BKTableNameModuleHostConfig},
common.BKTableNameBaseProcess: watch.LastChainNodeData{Coll: common.BKTableNameBaseProcess},
}
if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(s.ctx, data); err != nil {
blog.Errorf("init last watch token failed, err: %v, data: %+v", err, data)
return err
}
return nil
}

if key.Collection() == event.BizSetRelationKey.Collection() {
// biz set relation's watch token is generated in the same way with the host identity's watch token
data := mapstr.MapStr{
"_id": key.Collection(),
common.BKTableNameBaseApp: watch.LastChainNodeData{Coll: common.BKTableNameBaseApp},
common.BKTableNameBaseBizSet: watch.LastChainNodeData{Coll: common.BKTableNameBaseBizSet},
common.BKFieldID: 0,
common.BKTokenField: "",
}
if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(s.ctx, data); err != nil {
blog.Errorf("init last biz set relation watch token failed, err: %v, data: %+v", err, data)
return err
}
return nil
}

data := watch.LastChainNodeData{
Coll: key.Collection(),
Token: "",
StartAtTime: streamtypes.TimeStamp{
Sec: uint32(time.Now().Unix()),
Nano: 0,
},
}
if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(s.ctx, data); err != nil {
blog.Errorf("init last watch token failed, err: %v, data: %+v", err, data)
return err
}
return nil
}

func (s *Service) migrateSpecifyVersion(req *restful.Request, resp *restful.Response) {
rHeader := req.Request.Header
rid := util.GetHTTPCCRequestID(rHeader)
defErr := s.CCErr.CreateDefaultCCErrorIf(util.GetLanguage(rHeader))
ownerID := common.BKDefaultOwnerID
updateCfg := &upgrader.Config{
OwnerID: ownerID,
User: common.CCSystemOperatorUserName,
}

input := new(MigrateSpecifyVersionRequest)
if err := json.NewDecoder(req.Request.Body).Decode(input); err != nil {
blog.Errorf("migrateSpecifyVersion failed, decode body err: %v, body:%+v,rid:%s", err, req.Request.Body, rid)
_ = resp.WriteError(http.StatusOK, &metadata.RespError{Msg: defErr.Error(common.CCErrCommJSONUnmarshalFailed)})
return
}

if input.CommitID != version.CCGitHash {
_ = resp.WriteError(http.StatusOK,
&metadata.RespError{Msg: defErr.Errorf(common.CCErrCommParamsInvalid, "commit_id")})
return
}

err := upgrader.UpgradeSpecifyVersion(s.ctx, s.db, s.cache, s.iam, updateCfg, input.Version)
if err != nil {
blog.Errorf("db upgrade specify failed, err: %+v, rid: %s", err, rid)
result := &metadata.RespError{
Msg: defErr.Errorf(common.CCErrCommMigrateFailed, err.Error()),
}
resp.WriteError(http.StatusInternalServerError, result)
return
}

result := MigrationResponse{
BaseResp: metadata.BaseResp{
Result: true,
Code: 0,
ErrMsg: "",
Permissions: nil,
},
Data: "migrate success. version: " + input.Version,
}
resp.WriteEntity(result)

}

var allConfigNames = map[string]bool{
"redis": true,
"mongodb": true,
Expand Down Expand Up @@ -350,19 +91,3 @@ func (s *Service) refreshConfig(req *restful.Request, resp *restful.Response) {
blog.Infof("refresh config success, input:%#v", input)
resp.WriteEntity(metadata.NewSuccessResp("refresh config success"))
}

// MigrationResponse TODO
type MigrationResponse struct {
metadata.BaseResp `json:",inline"`
Data interface{} `json:"data"`
PreVersion string `json:"pre_version"`
CurrentVersion string `json:"current_version"`
FinishedVersions []string `json:"finished_migrations"`
}

// MigrateSpecifyVersionRequest TODO
type MigrateSpecifyVersionRequest struct {
CommitID string `json:"commit_id"`
TimeStamp int64 `json:"time_stamp"`
Version string `json:"version"`
}
2 changes: 0 additions & 2 deletions src/scene_server/admin_server/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (s *Service) WebService() *restful.Container {

api.Route(api.POST("/authcenter/init").To(s.InitAuthCenter))
api.Route(api.POST("/authcenter/register").To(s.RegisterAuthAccount))
api.Route(api.POST("/migrate/{distribution}/{ownerID}").To(s.migrate))
api.Route(api.POST("/migrate/system/hostcrossbiz/{ownerID}").To(s.SetSystemConfiguration))
api.Route(api.POST("/migrate/system/user_config/{key}/{can}").To(s.UserConfigSwitch))
api.Route(api.GET("/find/system/config_admin").To(s.SearchConfigAdmin))
Expand All @@ -105,7 +104,6 @@ func (s *Service) WebService() *restful.Container {
api.Route(api.PUT("/update/system_config/platform_setting").To(s.UpdatePlatformSettingConfig))
api.Route(api.GET("/find/system_config/platform_setting/{type}").To(s.SearchPlatformSettingConfig))

api.Route(api.POST("/migrate/specify/version/{distribution}/{ownerID}").To(s.migrateSpecifyVersion))
api.Route(api.POST("/migrate/config/refresh").To(s.refreshConfig))
api.Route(api.POST("/migrate/dataid").To(s.migrateDataID))
api.Route(api.POST("/migrate/old/dataid").To(s.migrateOldDataID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* limitations under the License.
*/

package main
package imports

import (

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* limitations under the License.
*/

package main
package imports

import (
_ "configcenter/src/scene_server/admin_server/upgrader/history/v3.0.8"
Expand Down
Loading