Skip to content

Commit

Permalink
feat: supports batch update when syncing resource
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhengYa-0110 committed Sep 13, 2024
1 parent ecaf35d commit ba586c0
Show file tree
Hide file tree
Showing 46 changed files with 1,006 additions and 58 deletions.
4 changes: 4 additions & 0 deletions server/controller/recorder/common/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (m *Metadata) Copy() *Metadata {
}
}

func (m *Metadata) GetDB() *mysql.DB {
return m.DB
}

func (m *Metadata) GetORGID() int {
return m.ORGID
}
Expand Down
33 changes: 31 additions & 2 deletions server/controller/recorder/db/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package db
import (
"time"

"gorm.io/gorm/clause"

"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
rcommon "github.com/deepflowio/deepflow/server/controller/recorder/common"
Expand All @@ -34,10 +36,14 @@ type Operator[MPT constraint.MySQLModelPtr[MT], MT constraint.MySQLModel] interf
AddBatch(dbItems []*MT) ([]*MT, bool)
// 更新数据
Update(lcuuid string, updateInfo map[string]interface{}) (*MT, bool)
// 批量更新数据
UpdateBatch([]*MT, map[string]map[string]interface{}) ([]*MT, bool)
// 批量删除数据
DeleteBatch(lcuuids []string) ([]*MT, bool)

GetSoftDelete() bool
SetFieldsToUpdate([]string)
SetMetadata(*rcommon.Metadata)
}

type OperatorBase[MPT constraint.MySQLModelPtr[MT], MT constraint.MySQLModel] struct {
Expand All @@ -47,6 +53,7 @@ type OperatorBase[MPT constraint.MySQLModelPtr[MT], MT constraint.MySQLModel] st
softDelete bool
allocateID bool
fieldsNeededAfterCreate []string // fields needed to be used after create
fieldsToUpdate []string // fields needed to be updated
}

func newOperatorBase[MPT constraint.MySQLModelPtr[MT], MT constraint.MySQLModel](resourceTypeName string, softDelete, allocateID bool) OperatorBase[MPT, MT] {
Expand All @@ -57,9 +64,14 @@ func newOperatorBase[MPT constraint.MySQLModelPtr[MT], MT constraint.MySQLModel]
}
}

func (o *OperatorBase[MPT, MT]) SetMetadata(md *rcommon.Metadata) Operator[MPT, MT] {
func (o *OperatorBase[MPT, MT]) SetFieldsToUpdate(fs []string) {
o.fieldsToUpdate = fs
return
}

func (o *OperatorBase[MPT, MT]) SetMetadata(md *rcommon.Metadata) {
o.metadata = md
return o
return
}

func (o *OperatorBase[MPT, MT]) GetSoftDelete() bool {
Expand Down Expand Up @@ -117,6 +129,23 @@ func (o *OperatorBase[MPT, MT]) Update(lcuuid string, updateInfo map[string]inte
return dbItem, true
}

func (o *OperatorBase[MPT, MT]) UpdateBatch(dbItems []*MT, lcuuidToUpdateInfo map[string]map[string]interface{}) ([]*MT, bool) {
startTime := time.Now()
err := o.metadata.DB.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "lcuuid"}},
DoUpdates: clause.AssignmentColumns(o.fieldsToUpdate),
}).Create(&dbItems).Error
log.Infof("update batch cost: %v", time.Since(startTime))
if err != nil {
log.Errorf("%s batch failed: %v", rcommon.LogUpdate(o.resourceTypeName), err.Error(), o.metadata.LogPrefixes)
return nil, false
}
for lcuuid, updateInfo := range lcuuidToUpdateInfo {
log.Infof("%s (lcuuid: %s, detail: %+v) success", rcommon.LogUpdate(o.resourceTypeName), lcuuid, updateInfo, o.metadata.LogPrefixes)
}
return dbItems, true
}

func (o *OperatorBase[MPT, MT]) DeleteBatch(lcuuids []string) ([]*MT, bool) {
var deletedItems []*MT
err := o.metadata.DB.Where("lcuuid IN ?", lcuuids).Find(&deletedItems).Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,16 @@ type UpdatePtr[T Update] interface {

SetFields(interface{})
GetFields() interface{} // return *FieldsUpdate
SetDiffBase(interface{})
GetDiffBase() interface{} // return *constraint.DiffBase
SetCloudItem(interface{})
SetFieldsMap(map[string]interface{})
GetFieldsMap() map[string]interface{}
SetDiffBase(interface{}) // TODO remove this
GetDiffBase() interface{} // return *constraint.DiffBase
SetCloudItem(interface{}) // TODO remove this
GetCloudItem() interface{} // return *constraint.CloudModel
// SetNewMySQL(interface{}) // TODO: remove this
// GetNewMySQL() interface{}
// SetOldMySQL(interface{})
// GetOldMySQL() interface{}
}

// Update是所有资源更新消息的泛型约束
Expand Down
32 changes: 22 additions & 10 deletions server/controller/recorder/pubsub/message/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,27 @@ func (k *Key) GetLcuuid() string {
}

type Fields[T any] struct {
data *T
structData *T
mapData map[string]interface{}
}

func (f *Fields[T]) SetFields(data interface{}) {
f.data = data.(*T)
f.structData = data.(*T)
}

func (f *Fields[T]) GetFields() interface{} {
return f.data
return f.structData
}

type fieldDetail[T any] struct {
func (f *Fields[T]) SetFieldsMap(data map[string]interface{}) {
f.mapData = data
}

func (f *Fields[T]) GetFieldsMap() map[string]interface{} {
return f.mapData
}

type fieldDetail[T any] struct { // TODO add name
different bool
new T
old T
Expand Down Expand Up @@ -102,20 +111,20 @@ type MySQLData[MT constraint.MySQLModel] struct {
old *MT
}

func (m *MySQLData[MT]) GetNewMySQL() *MT {
func (m *MySQLData[MT]) GetNewMySQL() interface{} {
return m.new
}

func (m *MySQLData[MT]) SetNewMySQL(new *MT) {
m.new = new
func (m *MySQLData[MT]) SetNewMySQL(new interface{}) {
m.new = new.(*MT)
}

func (m *MySQLData[MT]) GetOldMySQL() *MT {
func (m *MySQLData[MT]) GetOldMySQL() interface{} {
return m.old
}

func (m *MySQLData[MT]) SetOldMySQL(old *MT) {
m.old = old
func (m *MySQLData[MT]) SetOldMySQL(old interface{}) {
m.old = old.(*MT)
}

type DiffBase[DT constraint.DiffBase] struct {
Expand Down Expand Up @@ -338,6 +347,7 @@ type VInterfaceFieldsUpdate struct {
Name fieldDetail[string]
TapMac fieldDetail[string]
Type fieldDetail[int]
DeviceID fieldDetail[int]
NetnsID fieldDetail[uint32]
VTapID fieldDetail[uint32]
NetworkID fieldDetail[int]
Expand Down Expand Up @@ -743,6 +753,8 @@ type ProcessFieldsUpdate struct {
Key
Name fieldDetail[string]
ContainerID fieldDetail[string]
DeviceType fieldDetail[int]
DeviceID fieldDetail[int]
OSAPPTags fieldDetail[string]
VMID fieldDetail[int]
VPCID fieldDetail[int]
Expand Down
19 changes: 18 additions & 1 deletion server/controller/recorder/updater/az.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ func NewAZ(wholeCache *cache.Cache, cloudData []cloudmodel.AZ) *AZ {
](
ctrlrcommon.RESOURCE_TYPE_AZ_EN,
wholeCache,
db.NewAZ().SetMetadata(wholeCache.GetMetadata()),
db.NewAZ(),
wholeCache.DiffBaseDataSet.AZs,
cloudData,
),
}
updater.dataGenerator = updater
updater.initDBOperator()
return updater
}

Expand All @@ -84,6 +85,10 @@ func (z *AZ) generateDBItemToAdd(cloudItem *cloudmodel.AZ) (*mysqlmodel.AZ, bool
return dbItem, true
}

func (z *AZ) getUpdateableFields() []string {
return []string{"name", "label", "region"}
}

func (z *AZ) generateUpdateInfo(diffBase *diffbase.AZ, cloudItem *cloudmodel.AZ) (*message.AZFieldsUpdate, map[string]interface{}, bool) {
structInfo := new(message.AZFieldsUpdate)
mapInfo := make(map[string]interface{})
Expand All @@ -102,3 +107,15 @@ func (z *AZ) generateUpdateInfo(diffBase *diffbase.AZ, cloudItem *cloudmodel.AZ)

return structInfo, mapInfo, len(mapInfo) > 0
}

func (z *AZ) setUpdatedFields(dbItem *mysqlmodel.AZ, updateInfo *message.AZFieldsUpdate) {
if updateInfo.Name.IsDifferent() {
dbItem.Name = updateInfo.Name.GetNew()
}
if updateInfo.Label.IsDifferent() {
dbItem.Label = updateInfo.Label.GetNew()
}
if updateInfo.RegionLcuuid.IsDifferent() {
dbItem.Region = updateInfo.RegionLcuuid.GetNew()
}
}
16 changes: 15 additions & 1 deletion server/controller/recorder/updater/cen.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ func NewCEN(wholeCache *cache.Cache, cloudData []cloudmodel.CEN) *CEN {
](
ctrlrcommon.RESOURCE_TYPE_CEN_EN,
wholeCache,
db.NewCEN().SetMetadata(wholeCache.GetMetadata()),
db.NewCEN(),
wholeCache.DiffBaseDataSet.CENs,
cloudData,
),
}
updater.dataGenerator = updater
updater.initDBOperator()
return updater
}

Expand Down Expand Up @@ -97,6 +98,10 @@ func (c *CEN) generateDBItemToAdd(cloudItem *cloudmodel.CEN) (*mysqlmodel.CEN, b
return dbItem, true
}

func (c *CEN) getUpdateableFields() []string {
return []string{"name", "epc_ids"}
}

func (c *CEN) generateUpdateInfo(diffBase *diffbase.CEN, cloudItem *cloudmodel.CEN) (*message.CENFieldsUpdate, map[string]interface{}, bool) {
structInfo := new(message.CENFieldsUpdate)
mapInfo := make(map[string]interface{})
Expand Down Expand Up @@ -124,3 +129,12 @@ func (c *CEN) generateUpdateInfo(diffBase *diffbase.CEN, cloudItem *cloudmodel.C

return structInfo, mapInfo, len(mapInfo) > 0
}

func (c *CEN) setUpdatedFields(dbItem *mysqlmodel.CEN, updateInfo *message.CENFieldsUpdate) {
if updateInfo.Name.IsDifferent() {
dbItem.Name = updateInfo.Name.GetNew()
}
if updateInfo.VPCIDs.IsDifferent() {
dbItem.VPCIDs = rcommon.IntSliceToString(updateInfo.VPCIDs.GetNew())
}
}
22 changes: 21 additions & 1 deletion server/controller/recorder/updater/dhcp_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ func NewDHCPPort(wholeCache *cache.Cache, cloudData []cloudmodel.DHCPPort) *DHCP
](
ctrlrcommon.RESOURCE_TYPE_DHCP_PORT_EN,
wholeCache,
db.NewDHCPPort().SetMetadata(wholeCache.GetMetadata()),
db.NewDHCPPort(),
wholeCache.DiffBaseDataSet.DHCPPorts,
cloudData,
),
}
updater.dataGenerator = updater
updater.initDBOperator()
return updater
}

Expand Down Expand Up @@ -93,6 +94,10 @@ func (p *DHCPPort) generateDBItemToAdd(cloudItem *cloudmodel.DHCPPort) (*mysqlmo
return dbItem, true
}

func (p *DHCPPort) getUpdateableFields() []string {
return []string{"epc_id", "name", "region", "az"}
}

func (p *DHCPPort) generateUpdateInfo(diffBase *diffbase.DHCPPort, cloudItem *cloudmodel.DHCPPort) (*message.DHCPPortFieldsUpdate, map[string]interface{}, bool) {
structInfo := new(message.DHCPPortFieldsUpdate)
mapInfo := make(map[string]interface{})
Expand Down Expand Up @@ -124,3 +129,18 @@ func (p *DHCPPort) generateUpdateInfo(diffBase *diffbase.DHCPPort, cloudItem *cl

return structInfo, mapInfo, len(mapInfo) > 0
}

func (p *DHCPPort) setUpdatedFields(dbItem *mysqlmodel.DHCPPort, updateInfo *message.DHCPPortFieldsUpdate) {
if updateInfo.VPCID.IsDifferent() {
dbItem.VPCID = updateInfo.VPCID.GetNew()
}
if updateInfo.Name.IsDifferent() {
dbItem.Name = updateInfo.Name.GetNew()
}
if updateInfo.RegionLcuuid.IsDifferent() {
dbItem.Region = updateInfo.RegionLcuuid.GetNew()
}
if updateInfo.AZLcuuid.IsDifferent() {
dbItem.AZ = updateInfo.AZLcuuid.GetNew()
}
}
16 changes: 15 additions & 1 deletion server/controller/recorder/updater/floating_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ func NewFloatingIP(wholeCache *cache.Cache, cloudData []cloudmodel.FloatingIP) *
](
ctrlrcommon.RESOURCE_TYPE_FLOATING_IP_EN,
wholeCache,
db.NewFloatingIP().SetMetadata(wholeCache.GetMetadata()),
db.NewFloatingIP(),
wholeCache.DiffBaseDataSet.FloatingIPs,
cloudData,
),
}
updater.dataGenerator = updater
updater.initDBOperator()
return updater
}

Expand Down Expand Up @@ -118,6 +119,10 @@ func (f *FloatingIP) generateDBItemToAdd(cloudItem *cloudmodel.FloatingIP) (*mys
return dbItem, true
}

func (f *FloatingIP) getUpdateableFields() []string {
return []string{"epc_id", "region"}
}

func (f *FloatingIP) generateUpdateInfo(diffBase *diffbase.FloatingIP, cloudItem *cloudmodel.FloatingIP) (*message.FloatingIPFieldsUpdate, map[string]interface{}, bool) {
structInfo := new(message.FloatingIPFieldsUpdate)
mapInfo := make(map[string]interface{})
Expand All @@ -140,3 +145,12 @@ func (f *FloatingIP) generateUpdateInfo(diffBase *diffbase.FloatingIP, cloudItem
}
return structInfo, mapInfo, len(mapInfo) > 0
}

func (f *FloatingIP) setUpdatedFields(dbItem *mysqlmodel.FloatingIP, updateInfo *message.FloatingIPFieldsUpdate) {
if updateInfo.VPCID.IsDifferent() {
dbItem.VPCID = updateInfo.VPCID.GetNew()
}
if updateInfo.RegionLcuuid.IsDifferent() {
dbItem.Region = updateInfo.RegionLcuuid.GetNew()
}
}
Loading

0 comments on commit ba586c0

Please sign in to comment.