Skip to content

Commit

Permalink
Merge pull request #203 from actiontech/issue-208-ee
Browse files Browse the repository at this point in the history
feat: add cb connection purpose
  • Loading branch information
rocky114 authored Mar 14, 2024
2 parents 7bc59a4 + 3ee3e0a commit a32b427
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 33 deletions.
82 changes: 50 additions & 32 deletions internal/dms/biz/cloudbeaver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,16 @@ type CloudbeaverUser struct {

type CloudbeaverConnection struct {
DMSDBServiceID string `json:"dms_db_service_id"`
Purpose string `json:"purpose"`
DMSUserId string `json:"dms_user_id"`
DMSDBServiceFingerprint string `json:"dms_db_service_fingerprint"`
CloudbeaverConnectionID string `json:"cloudbeaver_connection_id"`
}

func (c CloudbeaverConnection) PrimaryKey() string {
return getDBPrimaryKey(c.DMSDBServiceID, c.Purpose, c.DMSUserId)
}

type CloudbeaverRepo interface {
GetCloudbeaverUserByID(ctx context.Context, cloudbeaverUserId string) (*CloudbeaverUser, bool, error)
UpdateCloudbeaverUserCache(ctx context.Context, u *CloudbeaverUser) error
Expand All @@ -66,7 +71,7 @@ type CloudbeaverRepo interface {
GetCloudbeaverConnectionsByUserIdAndDBServiceIds(ctx context.Context, userId string, dmsDBServiceIds []string) ([]*CloudbeaverConnection, error)
GetCloudbeaverConnectionsByUserId(ctx context.Context, userId string) ([]*CloudbeaverConnection, error)
UpdateCloudbeaverConnectionCache(ctx context.Context, u *CloudbeaverConnection) error
DeleteCloudbeaverConnectionCache(ctx context.Context, dbServiceId, userId string) error
DeleteCloudbeaverConnectionCache(ctx context.Context, dbServiceId, userId, purpose string) error
}

type CloudbeaverUsecase struct {
Expand Down Expand Up @@ -606,6 +611,7 @@ func (cu *CloudbeaverUsecase) connectManagement(ctx context.Context, cloudbeaver
return err
}

// 已配置的项目管理权限和数据源工作台查询权限
projectIdMap := map[string]struct{}{}
dbServiceIdMap := map[string]struct{}{}
for _, opPermission := range opPermissions {
Expand Down Expand Up @@ -657,11 +663,17 @@ func (cu *CloudbeaverUsecase) connectManagement(ctx context.Context, cloudbeaver
return nil
}

// 判断连接是否唯一的条件:dbServiceId : purpose : userUid
func getDBPrimaryKey(dbUid, purpose, userUid string) string {
// service.UID:service.AccountPurpose:userId
return fmt.Sprint(dbUid, ":", purpose, ":", userUid)
}

func (cu *CloudbeaverUsecase) operateConnection(ctx context.Context, activeDBServices []*DBService, userId string) error {
dbServiceMap := map[string]*DBService{}
projectMap := map[string]string{}
for _, service := range activeDBServices {
dbServiceMap[service.UID] = service
dbServiceMap[getDBPrimaryKey(service.UID, service.AccountPurpose, userId)] = service

project, err := cu.dbServiceUsecase.projectUsecase.GetProject(ctx, service.ProjectUID)
if err != nil {
Expand All @@ -678,34 +690,32 @@ func (cu *CloudbeaverUsecase) operateConnection(ctx context.Context, activeDBSer
return err
}

var deleteConnection []string
var deleteConnections []*CloudbeaverConnection

cloudbeaverConnectionMap := map[string]*CloudbeaverConnection{}
for _, connection := range cloudbeaverConnections {
// 删除用户关联的连接
if connection.DMSUserId == userId {
cloudbeaverConnectionMap[connection.DMSDBServiceID] = connection

if _, ok := dbServiceMap[connection.DMSDBServiceID]; !ok {
deleteConnection = append(deleteConnection, connection.DMSDBServiceID)
cloudbeaverConnectionMap[connection.PrimaryKey()] = connection
if _, ok := dbServiceMap[connection.PrimaryKey()]; !ok {
deleteConnections = append(deleteConnections, connection)
}
}
}

var createConnection []string
var updateConnection []string
createConnections, updateConnections := []*CloudbeaverConnection{}, []*CloudbeaverConnection{}

for dbServiceId, dbService := range dbServiceMap {
if cloudbeaverConnection, ok := cloudbeaverConnectionMap[dbServiceId]; ok {
for _, dbService := range dbServiceMap {
if cloudbeaverConnection, ok := cloudbeaverConnectionMap[getDBPrimaryKey(dbService.UID, dbService.AccountPurpose, userId)]; ok {
if cloudbeaverConnection.DMSDBServiceFingerprint != cu.dbServiceUsecase.GetDBServiceFingerprint(dbService) {
updateConnection = append(updateConnection, dbService.UID)
updateConnections = append(updateConnections, &CloudbeaverConnection{DMSDBServiceID: dbService.UID, Purpose: dbService.AccountPurpose, DMSUserId: userId})
}
} else {
createConnection = append(createConnection, dbService.UID)
createConnections = append(createConnections, &CloudbeaverConnection{DMSDBServiceID: dbService.UID, Purpose: dbService.AccountPurpose, DMSUserId: userId})
}
}

if len(createConnection) == 0 && len(updateConnection) == 0 && len(deleteConnection) == 0 {
if len(createConnections) == 0 && len(updateConnections) == 0 && len(deleteConnections) == 0 {
return nil
}

Expand All @@ -716,27 +726,29 @@ func (cu *CloudbeaverUsecase) operateConnection(ctx context.Context, activeDBSer
}

// 同步实例连接信息
for _, dbServiceId := range createConnection {
if err = cu.createCloudbeaverConnection(ctx, cloudbeaverClient, dbServiceMap[dbServiceId], projectMap[dbServiceId], userId); err != nil {
cu.log.Errorf("create dnServerId %s connection failed: %v", dbServiceId, err)
for _, createConnection := range createConnections {
if err = cu.createCloudbeaverConnection(ctx, cloudbeaverClient, dbServiceMap[getDBPrimaryKey(createConnection.DMSDBServiceID, createConnection.Purpose, userId)],
projectMap[createConnection.DMSDBServiceID], userId); err != nil {
cu.log.Errorf("create connection %v failed: %v", createConnection, err)
}
}

for _, dbServiceId := range updateConnection {
if err = cu.updateCloudbeaverConnection(ctx, cloudbeaverClient, cloudbeaverConnectionMap[dbServiceId].CloudbeaverConnectionID, dbServiceMap[dbServiceId], projectMap[dbServiceId], userId); err != nil {
cu.log.Errorf("update dnServerId %s to connection failed: %v", dbServiceId, err)
for _, updateConnection := range updateConnections {
if err = cu.updateCloudbeaverConnection(ctx, cloudbeaverClient, updateConnection.CloudbeaverConnectionID, dbServiceMap[getDBPrimaryKey(updateConnection.DMSDBServiceID, updateConnection.Purpose, userId)], projectMap[updateConnection.DMSDBServiceID], userId); err != nil {
cu.log.Errorf("update dnServerId %s to connection failed: %v", updateConnection, err)
}
}

for _, dbServiceId := range deleteConnection {
if err = cu.deleteCloudbeaverConnection(ctx, cloudbeaverClient, cloudbeaverConnectionMap[dbServiceId].CloudbeaverConnectionID, dbServiceId, userId); err != nil {
cu.log.Errorf("delete dbServerId %s to connection failed: %v", dbServiceId, err)
for _, deleteConnection := range deleteConnections {
if err = cu.deleteCloudbeaverConnection(ctx, cloudbeaverClient, deleteConnection.CloudbeaverConnectionID, deleteConnection.DMSDBServiceID, userId, deleteConnection.Purpose); err != nil {
cu.log.Errorf("delete connection %v failed: %v", deleteConnection, err)
}
}

return nil
}

// 删除DMS已知待删除的连接
func (cu *CloudbeaverUsecase) clearConnection(ctx context.Context) error {
cloudbeaverConnections, err := cu.repo.GetAllCloudbeaverConnections(ctx)
if err != nil {
Expand All @@ -750,7 +762,7 @@ func (cu *CloudbeaverUsecase) clearConnection(ctx context.Context) error {
}

for _, item := range cloudbeaverConnections {
if err = cu.deleteCloudbeaverConnection(ctx, cloudbeaverClient, item.CloudbeaverConnectionID, item.DMSDBServiceID, ""); err != nil {
if err = cu.deleteCloudbeaverConnection(ctx, cloudbeaverClient, item.CloudbeaverConnectionID, item.DMSDBServiceID, "", ""); err != nil {
cu.log.Errorf("delete dbServerId %s to connection failed: %v", item.DMSDBServiceID, err)

return fmt.Errorf("delete dbServerId %s to connection failed: %v", item.DMSDBServiceID, err)
Expand Down Expand Up @@ -835,7 +847,7 @@ func (cu *CloudbeaverUsecase) bindUserAccessConnection(ctx context.Context, clou
}

func (cu *CloudbeaverUsecase) createCloudbeaverConnection(ctx context.Context, client *cloudbeaver.Client, dbService *DBService, project, userId string) error {
params, err := cu.GenerateCloudbeaverConnectionParams(dbService, project, userId)
params, err := cu.GenerateCloudbeaverConnectionParams(dbService, project, dbService.AccountPurpose)
if err != nil {
return fmt.Errorf("%s unsupported", dbService.DBType)
}
Expand All @@ -858,13 +870,14 @@ func (cu *CloudbeaverUsecase) createCloudbeaverConnection(ctx context.Context, c
DMSDBServiceID: dbService.UID,
DMSUserId: userId,
DMSDBServiceFingerprint: cu.dbServiceUsecase.GetDBServiceFingerprint(dbService),
Purpose: dbService.AccountPurpose,
CloudbeaverConnectionID: resp.Connection.ID,
})
}

// UpdateCloudbeaverConnection 更新完毕后会同步缓存
func (cu *CloudbeaverUsecase) updateCloudbeaverConnection(ctx context.Context, client *cloudbeaver.Client, cloudbeaverConnectionId string, dbService *DBService, project, userId string) error {
params, err := cu.GenerateCloudbeaverConnectionParams(dbService, project, userId)
params, err := cu.GenerateCloudbeaverConnectionParams(dbService, project, dbService.AccountPurpose)
if err != nil {
return fmt.Errorf("%s unsupported", dbService.DBType)
}
Expand Down Expand Up @@ -892,11 +905,12 @@ func (cu *CloudbeaverUsecase) updateCloudbeaverConnection(ctx context.Context, c
DMSDBServiceID: dbService.UID,
DMSUserId: userId,
DMSDBServiceFingerprint: cu.dbServiceUsecase.GetDBServiceFingerprint(dbService),
Purpose: dbService.AccountPurpose,
CloudbeaverConnectionID: resp.Connection.ID,
})
}

func (cu *CloudbeaverUsecase) deleteCloudbeaverConnection(ctx context.Context, client *cloudbeaver.Client, cloudbeaverConnectionId, dbServiceId, userId string) error {
func (cu *CloudbeaverUsecase) deleteCloudbeaverConnection(ctx context.Context, client *cloudbeaver.Client, cloudbeaverConnectionId, dbServiceId, userId, purpose string) error {
variables := make(map[string]interface{})
variables["connectionId"] = cloudbeaverConnectionId
variables["projectId"] = cloudbeaverProjectId
Expand All @@ -910,13 +924,17 @@ func (cu *CloudbeaverUsecase) deleteCloudbeaverConnection(ctx context.Context, c
return err
}

return cu.repo.DeleteCloudbeaverConnectionCache(ctx, dbServiceId, userId)
return cu.repo.DeleteCloudbeaverConnectionCache(ctx, dbServiceId, userId, purpose)
}

func (cu *CloudbeaverUsecase) generateCommonCloudbeaverConfigParams(dbService *DBService, project, userId string) map[string]interface{} {
func (cu *CloudbeaverUsecase) generateCommonCloudbeaverConfigParams(dbService *DBService, project, purpose string) map[string]interface{} {
name := fmt.Sprintf("%s:%s", project, dbService.Name)
if purpose != "" {
name = fmt.Sprintf("%s:%s", name, purpose)
}
return map[string]interface{}{
"configurationType": "MANUAL",
"name": fmt.Sprintf("%s:%s:%s", project, dbService.Name, userId),
"name": name,
"template": false,
"host": dbService.Host,
"port": dbService.Port,
Expand All @@ -933,9 +951,9 @@ func (cu *CloudbeaverUsecase) generateCommonCloudbeaverConfigParams(dbService *D

const cloudbeaverProjectId = "g_GlobalConfiguration"

func (cu *CloudbeaverUsecase) GenerateCloudbeaverConnectionParams(dbService *DBService, project string, userId string) (map[string]interface{}, error) {
func (cu *CloudbeaverUsecase) GenerateCloudbeaverConnectionParams(dbService *DBService, project string, purpose string) (map[string]interface{}, error) {
var err error
config := cu.generateCommonCloudbeaverConfigParams(dbService, project, userId)
config := cu.generateCommonCloudbeaverConfigParams(dbService, project, purpose)

dbType, err := constant.ParseDBType(dbService.DBType)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/dms/biz/db_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type DBService struct {

// sqle config
SQLEConfig *SQLEConfig
// PROV config
AccountPurpose string
}

type DBTypeCount struct {
Expand Down
5 changes: 4 additions & 1 deletion internal/dms/storage/cloudbeaver.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,15 @@ func (cr *CloudbeaverRepo) UpdateCloudbeaverConnectionCache(ctx context.Context,
})
}

func (cr *CloudbeaverRepo) DeleteCloudbeaverConnectionCache(ctx context.Context, dbServiceId, userId string) error {
func (cr *CloudbeaverRepo) DeleteCloudbeaverConnectionCache(ctx context.Context, dbServiceId, userId, purpose string) error {
return transaction(cr.log, ctx, cr.db, func(tx *gorm.DB) error {
db := tx.WithContext(ctx).Where("dms_db_service_id = ?", dbServiceId)
if len(userId) > 0 {
db = db.Where("dms_user_id = ?", userId)
}
if len(purpose) > 0 {
db = db.Where("purpose = ?", purpose)
}
if err := db.Delete(&model.CloudbeaverConnectionCache{}).Error; err != nil {
return fmt.Errorf("failed to delete cloudbeaver db Service: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/dms/storage/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func convertBizCloudbeaverConnection(u *biz.CloudbeaverConnection) *model.Cloudb
DMSDBServiceID: u.DMSDBServiceID,
DMSUserID: u.DMSUserId,
DMSDBServiceFingerprint: u.DMSDBServiceFingerprint,
Purpose: u.Purpose,
CloudbeaverConnectionID: u.CloudbeaverConnectionID,
}
}
Expand Down Expand Up @@ -300,6 +301,7 @@ func convertModelCloudbeaverConnection(items []*model.CloudbeaverConnectionCache
DMSDBServiceID: item.DMSDBServiceID,
DMSUserId: item.DMSUserID,
DMSDBServiceFingerprint: item.DMSDBServiceFingerprint,
Purpose: item.Purpose,
CloudbeaverConnectionID: item.CloudbeaverConnectionID,
})
}
Expand Down
1 change: 1 addition & 0 deletions internal/dms/storage/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ type CloudbeaverConnectionCache struct {
DMSUserID string `json:"dms_user_id" gorm:"column:dms_user_id;primaryKey"`
DMSDBServiceFingerprint string `json:"dms_db_service_fingerprint" gorm:"size:255;column:dms_db_service_fingerprint"`
CloudbeaverConnectionID string `json:"cloudbeaver_connection_id" gorm:"size:255;column:cloudbeaver_connection_id"`
Purpose string `json:"purpose" gorm:"size:20;column:purpose;primaryKey"`
}

type DatabaseSourceService struct {
Expand Down

0 comments on commit a32b427

Please sign in to comment.