Skip to content

Commit

Permalink
[review] Address minor comments
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianVeaux committed Jan 7, 2020
1 parent d3cf7f6 commit 388d977
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 23 deletions.
2 changes: 2 additions & 0 deletions vsphere/datadog_checks/vsphere/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
HISTORICAL_RESOURCES = [vim.Datacenter, vim.Datastore, vim.ClusterComputeResource]
ALL_RESOURCES_WITH_METRICS = REALTIME_RESOURCES + HISTORICAL_RESOURCES

REALTIME_METRICS_INTERVAL_ID = 20

DEFAULT_BATCH_COLLECTOR_SIZE = 500
DEFAULT_METRICS_PER_QUERY = 500
UNLIMITED_HIST_METRICS_PER_QUERY = float('inf')
Expand Down
10 changes: 5 additions & 5 deletions vsphere/datadog_checks/vsphere/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@
'virtualDisk.writeIOSize.latest': (4, 4, True),
'virtualDisk.writeLatencyUS.latest': (4, 4, True),
'virtualDisk.writeLoadMetric.latest': (2, 2, True),
'virtualDisk.writeOIO.latest': (2, 2, 'instance'),
'virtualDisk.writeOIO.latest': (2, 2, True),
}

# All metrics that can be collected from ESXi Hosts.
Expand Down Expand Up @@ -530,7 +530,7 @@
'sys.resourceMemZero.latest': (3, 3, True),
'sys.uptime.latest': (1, 3),
'virtualDisk.busResets.sum': (2, 4, True),
'virtualDisk.commandsAborted.sum': (2, 4, 'instance'),
'virtualDisk.commandsAborted.sum': (2, 4, True),
}

# All metrics that can be collected from Datastores.
Expand All @@ -552,7 +552,7 @@
'disk.numberWriteAveraged.avg': (1, 3),
'disk.provisioned.latest': (1, 1, True),
'disk.unshared.latest': (1, 1, True),
'disk.used.latest': (1, 1, 'instance'),
'disk.used.latest': (1, 1, True),
}

# All metrics that can be collected from Datacenters.
Expand All @@ -578,7 +578,7 @@
'vmop.numSuspend.latest': (1, 3),
'vmop.numUnregister.latest': (1, 3),
'vmop.numVMotion.latest': (1, 3),
'vmop.numXVMotion.latest': (1, 3, 'aggregate'),
'vmop.numXVMotion.latest': (1, 3),
}

# All metrics that can be collected from Clusters.
Expand Down Expand Up @@ -619,7 +619,7 @@
'vmop.numSuspend.latest': (1, 3),
'vmop.numUnregister.latest': (1, 3),
'vmop.numVMotion.latest': (1, 3),
'vmop.numXVMotion.latest': (1, 3, 'aggregate'),
'vmop.numXVMotion.latest': (1, 3),
}

ALLOWED_METRICS_FOR_MOR = {
Expand Down
10 changes: 6 additions & 4 deletions vsphere/datadog_checks/vsphere/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def format_metric_name(counter):

def match_any_regex(string, regexes):
for regex in regexes:
match = regex.match(string) # FIXME: Should we use re.IGNORECASE like legacy?
match = regex.match(string)
if match:
return True
return False
Expand All @@ -26,8 +26,10 @@ def match_any_regex(string, regexes):
def is_resource_excluded_by_filters(mor, infrastructure_data, resource_filters):
resource_type = MOR_TYPE_AS_STRING[type(mor)]

if not [f for f in resource_filters if f[0] == resource_type]:
# No filter for this resource, collect everything
for f in resource_filters:
if f[0] == resource_type:
break
else:
return False

name_filter = resource_filters.get((resource_type, 'name'))
Expand Down Expand Up @@ -91,9 +93,9 @@ def get_parent_tags_recursively(mor, infrastructure_data):
"""
mor_props = infrastructure_data.get(mor)
parent = mor_props.get('parent')
parent_props = infrastructure_data.get(parent, {})
if parent:
tags = []
parent_props = infrastructure_data.get(parent, {})
parent_name = ensure_unicode(parent_props.get('name', 'unknown'))
if isinstance(parent, vim.HostSystem):
tags.append(u'vsphere_host:{}'.format(parent_name))
Expand Down
39 changes: 25 additions & 14 deletions vsphere/datadog_checks/vsphere/vsphere.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
from __future__ import division

import re
import time
from collections import defaultdict
Expand All @@ -23,6 +25,7 @@
EXTRA_FILTER_PROPERTIES_FOR_VMS,
HISTORICAL_RESOURCES,
MAX_QUERY_METRICS_OPTION,
REALTIME_METRICS_INTERVAL_ID,
REALTIME_RESOURCES,
)
from datadog_checks.vsphere.legacy.event import VSphereEvent
Expand Down Expand Up @@ -54,7 +57,7 @@ def __new__(cls, name, init_config, instances):
return super(VSphereCheck, cls).__new__(cls)

def __init__(self, name, init_config, instances):
AgentCheck.__init__(self, name, init_config, instances)
super(VSphereCheck, self).__init__(name, init_config, instances)
# Configuration fields
self.base_tags = self.instance.get("tags", [])
self.collection_level = self.instance.get("collection_level", 1)
Expand Down Expand Up @@ -135,7 +138,7 @@ def validate_and_format_config(self):
if f['property'] not in allowed_prop_names:
self.log.warning(
u"Ignoring filter %r because property '%s' is not valid "
"for resource type %s. Should be one of %r.",
u"for resource type %s. Should be one of %r.",
f,
f['property'],
f['resource'],
Expand Down Expand Up @@ -166,7 +169,7 @@ def initiate_connection(self):
self.api = VSphereAPI(self.instance)
self.log.info("Connected")
except APIConnectionError:
self.log.exception("Cannot authenticate to vCenter API. The check will not run.")
self.log.error("Cannot authenticate to vCenter API. The check will not run.")
self.service_check(SERVICE_CHECK_NAME, AgentCheck.CRITICAL, tags=self.base_tags, hostname=None)
raise

Expand Down Expand Up @@ -301,7 +304,7 @@ def submit_metrics_callback(self, task):
value = valid_values[-1]
if metric_name in PERCENT_METRICS:
# Convert the percentage to a float.
value = float(value) / 100
value /= 100

tags = []
if should_collect_per_instance_values(metric_name, resource_type):
Expand Down Expand Up @@ -334,10 +337,8 @@ def query_metrics_wrapper(self, query_specs):
self.histogram('datadog.vsphere.query_metrics.time', t0.total(), tags=self.base_tags, raw=True)
return metrics_values

def collect_metrics(self):
def collect_metrics(self, thread_pool):
"""Creates a pool of threads and run the query_metrics calls in parallel."""
pool_executor = ThreadPoolExecutor(max_workers=self.threads_count)
self.log.info("Starting metric collection in %d threads." % self.threads_count)
tasks = []
for resource_type in self.collected_resource_types:
mors = self.infrastructure_cache.get_mors(resource_type)
Expand All @@ -356,15 +357,15 @@ def collect_metrics(self):
query_spec.entity = mor
query_spec.metricId = metrics
if resource_type in REALTIME_RESOURCES:
query_spec.intervalId = 20 # FIXME: Make constant
query_spec.intervalId = REALTIME_METRICS_INTERVAL_ID
query_spec.maxSample = 1 # Request a single datapoint
else:
# We cannot use `maxSample` for historical metrics, let's specify a timewindow that will
# contain at least one element
query_spec.startTime = datetime.now() - timedelta(hours=2)
query_specs.append(query_spec)
if query_specs:
tasks.append(pool_executor.submit(lambda q: self.query_metrics_wrapper(q), query_specs))
tasks.append(thread_pool.submit(lambda q: self.query_metrics_wrapper(q), query_specs))

self.log.info("Queued all %d tasks, waiting for completion.", len(tasks))
while tasks:
Expand All @@ -373,12 +374,15 @@ def collect_metrics(self):
time.sleep(0.1)
continue
for task in finished_tasks:
self.submit_metrics_callback(task)
try:
self.submit_metrics_callback(task)
except Exception:
self.log.exception(
"Exception raised during the submit_metrics_callback. "
"Ignoring the error and continuing execution."
)
tasks.remove(task)

self.log.info("All tasks completed, shutting down the thread pool.")
pool_executor.shutdown()

def make_batch(self, mors, metric_ids, resource_type):
"""Iterates over mor and generate batches with a fixed number of metrics to query.
Querying multiple resource types in the same call is error prone if we query a cluster metric. Indeed,
Expand Down Expand Up @@ -513,4 +517,11 @@ def check(self, _):
vm_count = len(self.infrastructure_cache.get_mors(vim.VirtualMachine))
self.gauge('vm.count', vm_count, tags=self.base_tags, hostname=None)

self.collect_metrics()
# Creating a thread pool and starting metric collection
pool_executor = ThreadPoolExecutor(max_workers=self.threads_count)
self.log.info("Starting metric collection in %d threads." % self.threads_count)
try:
self.collect_metrics(pool_executor)
self.log.info("All tasks completed, shutting down the thread pool.")
finally:
pool_executor.shutdown()

0 comments on commit 388d977

Please sign in to comment.