Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(region): isolate device sync #21714

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ require (
k8s.io/cri-api v0.22.17
k8s.io/klog/v2 v2.20.0
moul.io/http2curl/v2 v2.3.0
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20241125120153-2a0e6362368b
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20241128072830-0574df058206
yunion.io/x/executor v0.0.0-20230705125604-c5ac3141db32
yunion.io/x/jsonutils v1.0.1-0.20240930100528-1671a2d0d22f
yunion.io/x/log v1.0.1-0.20240305175729-7cf2d6cd5a91
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1376,8 +1376,8 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20241125120153-2a0e6362368b h1:Usnw/w/qSTX5gLJYhSoMCeUifVxs32c4DNVvLA5RmEQ=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20241125120153-2a0e6362368b/go.mod h1:rj/pb3DitJlQaQD8UW1oxx/KD+PzDZqoywzqRJaFE9A=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20241128072830-0574df058206 h1:+rHMXfo83atsceTG3oZnOHCpI7LWBDdoB/I3GEND/ZM=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20241128072830-0574df058206/go.mod h1:rj/pb3DitJlQaQD8UW1oxx/KD+PzDZqoywzqRJaFE9A=
yunion.io/x/executor v0.0.0-20230705125604-c5ac3141db32 h1:v7POYkQwo1XzOxBoIoRVr/k0V9Y5JyjpshlIFa9raug=
yunion.io/x/executor v0.0.0-20230705125604-c5ac3141db32/go.mod h1:Uxuou9WQIeJXNpy7t2fPLL0BYLvLiMvGQwY7Qc6aSws=
yunion.io/x/jsonutils v0.0.0-20190625054549-a964e1e8a051/go.mod h1:4N0/RVzsYL3kH3WE/H1BjUQdFiWu50JGCFQuuy+Z634=
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/compute/isolated_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type IsolateDeviceDetails struct {

type IsolatedDeviceListInput struct {
apis.StandaloneResourceListInput
apis.ExternalizedResourceBaseListInput
apis.DomainizedResourceListInput

HostFilterListInput
Expand Down
103 changes: 103 additions & 0 deletions pkg/compute/models/cloudsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func syncRegionQuotas(ctx context.Context, userCred mcclient.TokenCredential, sy
return remoteRegion.GetICloudQuotas()
}()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return nil
}
msg := fmt.Sprintf("GetICloudQuotas for region %s failed %s", remoteRegion.GetName(), err)
log.Errorf(msg)
return err
Expand Down Expand Up @@ -209,6 +212,9 @@ func syncRegionEips(
return remoteRegion.GetIEips()
}()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return
}
msg := fmt.Sprintf("GetIEips for region %s failed %s", remoteRegion.GetName(), err)
log.Errorf(msg)
return
Expand Down Expand Up @@ -236,6 +242,9 @@ func syncRegionBuckets(ctx context.Context, userCred mcclient.TokenCredential, s
return remoteRegion.GetIBuckets()
}()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return
}
msg := fmt.Sprintf("GetIBuckets for region %s failed %s", remoteRegion.GetName(), err)
log.Errorf(msg)
return
Expand Down Expand Up @@ -272,6 +281,9 @@ func syncRegionVPCs(
return remoteRegion.GetIVpcs()
}()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return
}
msg := fmt.Sprintf("GetVpcs for region %s failed %s", remoteRegion.GetName(), err)
log.Errorf(msg)
return
Expand Down Expand Up @@ -977,6 +989,7 @@ func syncZoneHosts(
syncHostNics(ctx, userCred, syncResults, provider, &localHosts[i], remoteHosts[i])
// syncHostWires(ctx, userCred, syncResults, provider, &localHosts[i], remoteHosts[i])
syncHostVMs(ctx, userCred, syncResults, provider, driver, &localHosts[i], remoteHosts[i], syncRange)
syncHostIsolateDevices(ctx, userCred, syncResults, provider, driver, &localHosts[i], remoteHosts[i], syncRange)
}()
}
return newCachePairs
Expand Down Expand Up @@ -1102,6 +1115,33 @@ func syncHostVMs(ctx context.Context, userCred mcclient.TokenCredential, syncRes

}

func syncHostIsolateDevices(ctx context.Context, userCred mcclient.TokenCredential, syncResults SSyncResultSet, provider *SCloudprovider, driver cloudprovider.ICloudProvider, localHost *SHost, remoteHost cloudprovider.ICloudHost, syncRange *SSyncRange) {
devs, err := func() ([]cloudprovider.IsolateDevice, error) {
defer syncResults.AddRequestCost(HostManager)()
return remoteHost.GetIsolateDevices()
}()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return
}
msg := fmt.Sprintf("GetIsolateDevices for host %s failed %s", remoteHost.GetName(), err)
log.Errorf(msg)
return
}

result := func() compare.SyncResult {
defer syncResults.AddSqlCost(HostManager)()
return localHost.SyncHostIsolateDevices(ctx, userCred, driver, devs, provider.GetOwnerId(), syncRange.Xor)
}()

syncResults.Add(HostManager, result)

msg := result.Result()
notes := fmt.Sprintf("SyncHostIsolateDevices for host %s result: %s", localHost.Name, msg)
log.Infof(notes)
provider.SyncError(result, notes, userCred)
}

func syncVMPeripherals(
ctx context.Context,
userCred mcclient.TokenCredential,
Expand Down Expand Up @@ -1137,6 +1177,10 @@ func syncVMPeripherals(
if result.IsError() {
log.Errorf("syncVMInstanceSnapshots error %v", result.AllError())
}
err = syncVMIsolateDevices(ctx, userCred, provider, local, remote)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotSupported && errors.Cause(err) != cloudprovider.ErrNotImplemented {
logclient.AddSimpleActionLog(local, logclient.ACT_CLOUD_SYNC, errors.Wrapf(err, "syncVMIsolateDevice"), userCred, false)
}
}

func syncVMNics(
Expand Down Expand Up @@ -1206,6 +1250,38 @@ func syncVMSecgroups(ctx context.Context, userCred mcclient.TokenCredential, pro
return localVM.SyncVMSecgroups(ctx, userCred, secgroupIds)
}

func syncVMIsolateDevices(ctx context.Context, userCred mcclient.TokenCredential, provider *SCloudprovider, localVM *SGuest, remoteVM cloudprovider.ICloudVM) error {
devIds, err := remoteVM.GetIsolateDeviceIds()
if err != nil {
return errors.Wrap(err, "remoteVM.GetIsolateDeviceIds")
}
return localVM.SyncVMIsolateDevices(ctx, userCred, devIds)
}

func (self *SGuest) SyncVMIsolateDevices(ctx context.Context, userCred mcclient.TokenCredential, externalIds []string) error {
host, err := self.GetHost()
if err != nil {
return err
}
sq := HostManager.Query("id").Equals("manager_id", host.ManagerId).SubQuery()
q := IsolatedDeviceManager.Query().In("host_id", sq).In("external_id", externalIds)
devs := []SIsolatedDevice{}
err = db.FetchModelObjects(IsolatedDeviceManager, q, &devs)
if err != nil {
return err
}
for i := range devs {
_, err = db.Update(&devs[i], func() error {
devs[i].GuestId = self.Id
return nil
})
if err != nil {
return err
}
}
return nil
}

func syncSkusFromPrivateCloud(
ctx context.Context,
userCred mcclient.TokenCredential,
Expand Down Expand Up @@ -1981,6 +2057,9 @@ func syncRegionSnapshots(
return remoteRegion.GetISnapshots()
}()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return
}
msg := fmt.Sprintf("GetISnapshots for region %s failed %s", remoteRegion.GetName(), err)
log.Errorf(msg)
return
Expand Down Expand Up @@ -2016,6 +2095,9 @@ func syncRegionSnapshotPolicies(
return remoteRegion.GetISnapshotPolicies()
}()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return
}
log.Errorf("GetISnapshotPolicies for region %s failed %s", remoteRegion.GetName(), err)
return
}
Expand Down Expand Up @@ -2048,6 +2130,9 @@ func syncRegionNetworkInterfaces(
return remoteRegion.GetINetworkInterfaces()
}()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return
}
msg := fmt.Sprintf("GetINetworkInterfaces for region %s failed %s", remoteRegion.GetName(), err)
log.Errorf(msg)
return
Expand Down Expand Up @@ -2084,6 +2169,9 @@ func syncRegionNetworkInterfaces(
func syncInterfaceAddresses(ctx context.Context, userCred mcclient.TokenCredential, localInterface *SNetworkInterface, remoteInterface cloudprovider.ICloudNetworkInterface) {
addresses, err := remoteInterface.GetICloudInterfaceAddresses()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return
}
msg := fmt.Sprintf("GetICloudInterfaceAddresses for networkinterface %s failed %s", remoteInterface.GetName(), err)
log.Errorf(msg)
return
Expand Down Expand Up @@ -2811,6 +2899,9 @@ func syncGlobalVpcs(ctx context.Context, userCred mcclient.TokenCredential, sync
}
secgroups, err := remoteVpcs[i].GetISecurityGroups()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
continue
}
log.Errorf("GetISecurityGroup for global vpc %s error: %v", localVpcs[i].Name, err)
continue
}
Expand Down Expand Up @@ -2849,6 +2940,9 @@ func syncTablestore(
return remoteRegion.GetICloudTablestores()
}()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return nil
}
msg := fmt.Sprintf("GetICloudTablestores for region %s failed %s", remoteRegion.GetName(), err)
log.Errorf(msg)
return err
Expand Down Expand Up @@ -2879,6 +2973,9 @@ func syncModelartsPools(
) error {
ipools, err := remoteRegion.GetIModelartsPools()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return nil
}
msg := fmt.Sprintf("GetIModelartsPools for provider %s failed %s", err, ipools)
log.Errorf(msg)
return err
Expand All @@ -2901,6 +2998,9 @@ func syncModelartsPoolSkus(
) error {
ipools, err := remoteRegion.GetIModelartsPoolSku()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return nil
}
msg := fmt.Sprintf("GetIModelartsPoolSku for provider %s failed %s", err, ipools)
log.Errorf(msg)
return err
Expand All @@ -2923,6 +3023,9 @@ func syncMiscResources(
) error {
exts, err := remoteRegion.GetIMiscResources()
if err != nil {
if errors.Cause(err) == cloudprovider.ErrNotImplemented || errors.Cause(err) == cloudprovider.ErrNotSupported {
return nil
}
msg := fmt.Sprintf("GetIMiscResources for provider %s failed %v", provider.Name, err)
log.Errorf(msg)
return err
Expand Down
88 changes: 88 additions & 0 deletions pkg/compute/models/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2632,6 +2632,94 @@ func (self *SGuest) Purge(ctx context.Context, userCred mcclient.TokenCredential
return self.purge(ctx, userCred)
}

func (hh *SHost) GetIsolateDevices() ([]SIsolatedDevice, error) {
q := IsolatedDeviceManager.Query().Equals("host_id", hh.Id)
ret := []SIsolatedDevice{}
err := db.FetchModelObjects(IsolatedDeviceManager, q, &ret)
if err != nil {
return nil, err
}
return ret, nil
}

func (hh *SHost) SyncHostIsolateDevices(ctx context.Context, userCred mcclient.TokenCredential, iprovider cloudprovider.ICloudProvider, devs []cloudprovider.IsolateDevice, syncOwnerId mcclient.IIdentityProvider, xor bool) compare.SyncResult {
lockman.LockRawObject(ctx, IsolatedDeviceManager.Keyword(), hh.Id)
defer lockman.ReleaseRawObject(ctx, IsolatedDeviceManager.Keyword(), hh.Id)

result := compare.SyncResult{}

dbDevs, err := hh.GetIsolateDevices()
if err != nil {
result.Error(errors.Wrapf(err, "GetIsolateDevices"))
return result
}

removed := make([]SIsolatedDevice, 0)
commondb := make([]SIsolatedDevice, 0)
commonext := make([]cloudprovider.IsolateDevice, 0)
added := make([]cloudprovider.IsolateDevice, 0)
duplicated := make(map[string][]cloudprovider.IsolateDevice)

err = compare.CompareSets2(dbDevs, devs, &removed, &commondb, &commonext, &added, &duplicated)
if err != nil {
result.Error(err)
return result
}

for i := 0; i < len(removed); i += 1 {
err := removed[i].Delete(ctx, userCred)
if err != nil {
result.DeleteError(err)
continue
}
result.Delete()
}

if !xor {
for i := 0; i < len(commondb); i += 1 {
err := commondb[i].syncWithCloudIsolateDevice(ctx, userCred, commonext[i])
if err != nil {
result.UpdateError(err)
continue
}
result.Update()
}
}

for i := 0; i < len(added); i += 1 {
err := hh.newIsolateDevice(ctx, userCred, added[i])
if err != nil {
result.AddError(err)
continue
}
result.Add()
}

if len(duplicated) > 0 {
errs := make([]error, 0)
for k, vms := range duplicated {
errs = append(errs, errors.Wrapf(errors.ErrDuplicateId, "Duplicate Id %s (%d)", k, len(vms)))
}
result.AddError(errors.NewAggregate(errs))
}

return result
}

func (hh *SHost) newIsolateDevice(ctx context.Context, userCred mcclient.TokenCredential, dev cloudprovider.IsolateDevice) error {
ret := &SIsolatedDevice{}
ret.SetModelManager(IsolatedDeviceManager, ret)
ret.HostId = hh.Id
ret.ExternalId = dev.GetGlobalId()
ret.Name = dev.GetName()
ret.Model = dev.GetModel()
ret.Addr = dev.GetAddr()
ret.DevType = dev.GetDevType()
ret.NumaNode = dev.GetNumaNode()
ret.VendorDeviceId = dev.GetVendorDeviceId()
return IsolatedDeviceManager.TableSpec().Insert(ctx, ret)
}

func (hh *SHost) SyncHostVMs(ctx context.Context, userCred mcclient.TokenCredential, iprovider cloudprovider.ICloudProvider, vms []cloudprovider.ICloudVM, syncOwnerId mcclient.IIdentityProvider, xor bool) ([]SGuestSyncResult, compare.SyncResult) {
lockman.LockRawObject(ctx, GuestManager.Keyword(), hh.Id)
defer lockman.ReleaseRawObject(ctx, GuestManager.Keyword(), hh.Id)
Expand Down
Loading