Skip to content

Commit

Permalink
Merge pull request #18956 from swordqiu/automated-cherry-pick-of-#189…
Browse files Browse the repository at this point in the history
…55-upstream-master

Automated cherry pick of #18955: fix: log metric collection action
  • Loading branch information
zexi authored Dec 11, 2023
2 parents 89b414a + 9875d24 commit a59508a
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pkg/cloudmon/misc/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func PingProbe(ctx context.Context, userCred mcclient.TokenCredential, isStart b
network := sNetwork{networks[i]}
m, err := pingProbeNetwork(s, network)
if err != nil {
log.Errorf("pingProbeNetwork")
log.Errorf("pingProbeNetwork network %s(%s-%s) fail %s", network.Name, network.GuestIpStart, network.GuestIpEnd, err)
continue
}
metrics = append(metrics, m...)
Expand Down
184 changes: 141 additions & 43 deletions pkg/cloudmon/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"yunion.io/x/cloudmux/pkg/cloudprovider"
"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/appctx"
"yunion.io/x/pkg/errors"

api "yunion.io/x/onecloud/pkg/apis/compute"
Expand All @@ -36,6 +37,7 @@ import (
"yunion.io/x/onecloud/pkg/mcclient/modules/compute"
"yunion.io/x/onecloud/pkg/mcclient/modules/identity"
"yunion.io/x/onecloud/pkg/util/influxdb"
"yunion.io/x/onecloud/pkg/util/logclient"
)

type sBaseInfo struct {
Expand Down Expand Up @@ -632,14 +634,30 @@ func (self *SResources) UpdateSync(ctx context.Context, userCred mcclient.TokenC
}
}

func (self *SResources) CollectMetrics(ctx context.Context, userCred mcclient.TokenCredential, taskStartTime time.Time, isStart bool) {
type sMetricProvider struct {
api.CloudproviderDetails
}

func (p sMetricProvider) GetId() string {
return p.Id
}

func (p sMetricProvider) GetName() string {
return p.Name
}

func (p sMetricProvider) Keyword() string {
return "cloudprovider"
}

func (res *SResources) CollectMetrics(ctx context.Context, userCred mcclient.TokenCredential, taskStartTime time.Time, isStart bool) {
if isStart {
return
}
ch := make(chan struct{}, options.Options.CloudAccountCollectMetricsBatchCount)
defer close(ch)
s := auth.GetAdminSession(context.Background(), options.Options.Region)
resources := self.Cloudproviders.getResources(ctx, "")
s := auth.GetAdminSession(ctx, options.Options.Region)
resources := res.Cloudproviders.getResources(ctx, "")
cloudproviders := map[string]api.CloudproviderDetails{}
jsonutils.Update(&cloudproviders, resources)
az, _ := time.LoadLocation(options.Options.TimeZone)
Expand All @@ -649,194 +667,274 @@ func (self *SResources) CollectMetrics(ctx context.Context, userCred mcclient.To
for i := range cloudproviders {
ch <- struct{}{}
wg.Add(1)
go func(manager api.CloudproviderDetails) {
goctx := context.WithValue(ctx, appctx.APP_CONTEXT_KEY_START_TIME, time.Now().UTC())
go func(ctx context.Context, manager api.CloudproviderDetails) {
succ := true
msgs := make([]string, 0)
defer func() {
logclient.AddActionLogWithContext(ctx, &sMetricProvider{manager}, logclient.ACT_COLLECT_METRICS, strings.Join(msgs, ";"), userCred, succ)
wg.Done()
<-ch
}()

if strings.Contains(strings.ToLower(options.Options.SkipMetricPullProviders), strings.ToLower(manager.Provider)) {
log.Infof("skip %s metric pull with options: %s", manager.Provider, options.Options.SkipMetricPullProviders)
logmsg := fmt.Sprintf("skip %s metric pull with options: %s", manager.Provider, options.Options.SkipMetricPullProviders)
log.Infoln(logmsg)
msgs = append(msgs, logmsg)
return
}

driver, err := providerdriver.GetDriver(manager.Provider)
if err != nil {
log.Errorf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
return
}

if !driver.IsSupportMetrics() {
log.Infof("%s not support metrics, skip", driver.GetProvider())
logmsg := fmt.Sprintf("%s not support metrics, skip", driver.GetProvider())
log.Infoln(logmsg)
msgs = append(msgs, logmsg)
return
}

provider, err := compute.Cloudproviders.GetProvider(ctx, s, manager.Id)
if err != nil {
log.Errorf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("failed get provider %s(%s) driver %v", manager.Name, manager.Provider, err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
return
}
duration := driver.GetDelayDuration()
endTime := _endTime.Add(-1 * duration)
startTime := _startTime.Add(-1 * duration).Add(time.Second * -59)

resources = self.DBInstances.getResources(ctx, manager.Id)
resources = res.DBInstances.getResources(ctx, manager.Id)
dbinstances := map[string]api.DBInstanceDetails{}
err = jsonutils.Update(&dbinstances, resources)
if err != nil {
log.Errorf("unmarsha rds resources error: %v", err)
logmsg := fmt.Sprintf("unmarshal rds resources error: %v", err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
if len(dbinstances) > 0 {
err = driver.CollectDBInstanceMetrics(ctx, manager, provider, dbinstances, startTime, endTime)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
log.Errorf("CollectDBInstanceMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("CollectDBInstanceMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
}

resources = self.Servers.getResources(ctx, manager.Id)
resources = res.Servers.getResources(ctx, manager.Id)
servers := map[string]api.ServerDetails{}
err = jsonutils.Update(&servers, resources)
if err != nil {
log.Errorf("unmarsha server resources error: %v", err)
logmsg := fmt.Sprintf("unmarsha server resources error: %v", err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}

if len(servers) > 0 {
err = driver.CollectServerMetrics(ctx, manager, provider, servers, startTime, endTime)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
log.Errorf("CollectServerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("CollectServerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
log.Errorf(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
}

resources = self.Hosts.getResources(ctx, manager.Id)
resources = res.Hosts.getResources(ctx, manager.Id)
hosts := map[string]api.HostDetails{}
err = jsonutils.Update(&hosts, resources)
if err != nil {
log.Errorf("unmarsha host resources error: %v", err)
logmsg := fmt.Sprintf("unmarsha host resources error: %v", err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}

if len(hosts) > 0 {
err = driver.CollectHostMetrics(ctx, manager, provider, hosts, startTime, endTime)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
log.Errorf("CollectHostMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("CollectHostMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
}

resources = self.Storages.getResources(ctx, manager.Id)
resources = res.Storages.getResources(ctx, manager.Id)
storages := map[string]api.StorageDetails{}
err = jsonutils.Update(&storages, resources)
if err != nil {
log.Errorf("unmarsha storage resources error: %v", err)
logmsg := fmt.Sprintf("unmarsha storage resources error: %v", err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
if len(storages) > 0 {
err = driver.CollectStorageMetrics(ctx, manager, provider, storages, startTime, endTime)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
log.Errorf("CollectStorageMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("CollectStorageMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
}

resources = self.Redis.getResources(ctx, manager.Id)
resources = res.Redis.getResources(ctx, manager.Id)
caches := map[string]api.ElasticcacheDetails{}
err = jsonutils.Update(&caches, resources)
if err != nil {
log.Errorf("unmarsha redis resources error: %v", err)
logmsg := fmt.Sprintf("unmarsha redis resources error: %v", err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}

if len(caches) > 0 {
err = driver.CollectRedisMetrics(ctx, manager, provider, caches, startTime, endTime)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
log.Errorf("CollectRedisMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("CollectRedisMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
log.Errorf(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
}

resources = self.Loadbalancers.getResources(ctx, manager.Id)
resources = res.Loadbalancers.getResources(ctx, manager.Id)
lbs := map[string]api.LoadbalancerDetails{}
err = jsonutils.Update(&lbs, resources)
if err != nil {
log.Errorf("unmarsha lb resources error: %v", err)
logmsg := fmt.Sprintf("unmarsha lb resources error: %v", err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}

if len(lbs) > 0 {
err = driver.CollectLoadbalancerMetrics(ctx, manager, provider, lbs, startTime, endTime)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
log.Errorf("CollectLoadbalancerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("CollectLoadbalancerMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
log.Errorf(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
}

resources = self.Buckets.getResources(ctx, manager.Id)
resources = res.Buckets.getResources(ctx, manager.Id)
buckets := map[string]api.BucketDetails{}
err = jsonutils.Update(&buckets, resources)
if err != nil {
log.Errorf("unmarsha bucket resources error: %v", err)
logmsg := fmt.Sprintf("unmarsha bucket resources error: %v", err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}

if len(buckets) > 0 {
err = driver.CollectBucketMetrics(ctx, manager, provider, buckets, startTime, endTime)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
log.Errorf("CollectBucketMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("CollectBucketMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
}

resources = self.KubeClusters.getResources(ctx, manager.Id)
resources = res.KubeClusters.getResources(ctx, manager.Id)
clusters := map[string]api.KubeClusterDetails{}
err = jsonutils.Update(&clusters, resources)
if err != nil {
log.Errorf("unmarsha k8s resources error: %v", err)
logmsg := fmt.Sprintf("unmarsha k8s resources error: %v", err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}

if len(clusters) > 0 {
err = driver.CollectK8sMetrics(ctx, manager, provider, clusters, startTime, endTime)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
log.Errorf("CollectK8sMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("CollectK8sMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
}

resources = self.ModelartsPool.getResources(ctx, manager.Id)
resources = res.ModelartsPool.getResources(ctx, manager.Id)
pools := map[string]api.ModelartsPoolDetails{}
err = jsonutils.Update(&pools, resources)
if err != nil {
log.Errorf("unmarsha modelarts resources error: %v", err)
logmsg := fmt.Sprintf("unmarsha modelarts resources error: %v", err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}

if len(pools) > 0 {
err = driver.CollectModelartsPoolMetrics(ctx, manager, provider, pools, startTime, endTime)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
log.Errorf("CollectModelartsPoolMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("CollectModelartsPoolMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
}

resources = self.Wires.getResources(ctx, manager.Id)
resources = res.Wires.getResources(ctx, manager.Id)
wires := map[string]api.WireDetails{}
err = jsonutils.Update(&wires, resources)
if err != nil {
log.Errorf("unmarsha wires resources error: %v", err)
logmsg := fmt.Sprintf("unmarsha wires resources error: %v", err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}

if len(wires) > 0 {
err = driver.CollectWireMetrics(ctx, manager, provider, wires, startTime, endTime)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
log.Errorf("CollectWireMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("CollectWireMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
}

resources = self.ElasticIps.getResources(ctx, manager.Id)
resources = res.ElasticIps.getResources(ctx, manager.Id)
eips := map[string]api.ElasticipDetails{}
err = jsonutils.Update(&eips, resources)
if err != nil {
log.Errorf("unmarsha eips resources error: %v", err)
logmsg := fmt.Sprintf("unmarsha eips resources error: %v", err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}

if len(eips) > 0 {
err = driver.CollectEipMetrics(ctx, manager, provider, eips, startTime, endTime)
if err != nil && errors.Cause(err) != cloudprovider.ErrNotImplemented && errors.Cause(err) != cloudprovider.ErrNotSupported {
log.Errorf("CollectEipMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
logmsg := fmt.Sprintf("CollectEipMetrics for %s(%s) error: %v", manager.Name, manager.Provider, err)
log.Errorln(logmsg)
msgs = append(msgs, logmsg)
succ = false
}
}

}(cloudproviders[i])
}(goctx, cloudproviders[i])
}
wg.Wait()

resources = self.Cloudaccounts.getResources(ctx, "")
resources = res.Cloudaccounts.getResources(ctx, "")
accounts := map[string]api.CloudaccountDetail{}
jsonutils.Update(&accounts, resources)

Expand Down
2 changes: 2 additions & 0 deletions pkg/util/logclient/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,6 @@ const (
ACT_DELETE_SECURITY_GROUP_RULE = "delete_security_group_rule"

ACT_CLEAN_PROJECT = "clean_project"

ACT_COLLECT_METRICS = "collect_metrics"
)
5 changes: 5 additions & 0 deletions pkg/util/logclient/consts_i18n.go
Original file line number Diff line number Diff line change
Expand Up @@ -1450,4 +1450,9 @@ func init() {
EN("Sync Cloud Resource").
CN("同步云资源"),
)

o.Set(ACT_COLLECT_METRICS, i18n.NewTableEntry().
EN("Collect monitoring metrics").
CN("采集监控指标"),
)
}

0 comments on commit a59508a

Please sign in to comment.