Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dogstreams.d for dynamic dogstreams configuration #1557

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Changes
=======

# 5.4.0 / Unreleased
### Details
https://github.com/DataDog/dd-agent/compare/5.3.0...5.4.0

### Changes
* [FEATURE] dogstreams config directive now supports wildcards in paths. [#753][] & [#1550][].
* [FEATURE] Add a dogstreams.d directory which may contain YAML files. Makes
dynamically configuring dogstreams easier.

# 5.3.0 / 04-16-2015
### Details
https://github.com/DataDog/dd-agent/compare/5.2.2...5.3.0
Expand Down
93 changes: 77 additions & 16 deletions checks/datadog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# stdlib
import glob
import os
import sys
import traceback
Expand All @@ -11,7 +12,10 @@
import modules
from checks import LaconicFilter
from checks.utils import TailFile
from util import windows_friendly_colon_split
from util import windows_friendly_colon_split, yLoader

# 3rd party
import yaml

if hasattr('some string', 'partition'):
def partition(s, sep):
Expand All @@ -35,23 +39,40 @@ class EventDefaults(object):
class Dogstreams(object):
@classmethod
def init(cls, logger, config):
dogstreams_config = config.get('dogstreams', None)
dogstreams = []
if dogstreams_config:
# Expecting dogstreams config value to look like:
# <dogstream value>, <dog stream value>, ...
# Where <dogstream value> looks like:
# <log path>
# or
# <log path>:<module>:<parser function>
dogstreams = cls._instantiate_dogstreams(logger, config,
config.get('dogstreams', None))

logger.info("Dogstream parsers: %s" % repr(dogstreams))

return cls(logger, dogstreams)

def __init__(self, logger, dogstreams):
self.logger = logger
self.dogstreams = dogstreams

@classmethod
def _instantiate_dogstreams(cls, logger, config, dogstreams_config):
"""
Expecting dogstreams config value to look like:
<dogstream value>, <dog stream value>, ...
Where <dogstream value> looks like:
<log path>
or
<log path>:<module>:<parser function>
"""
# Load Dogstream objects from dogstreams.d.
dogstreams = cls._load_dogstreams_from_dir(logger, config)
if dogstreams_config:
# Create a Dogstream object for each <dogstream value>
for config_item in dogstreams_config.split(','):
try:
config_item = config_item.strip()
parts = windows_friendly_colon_split(config_item)
if len(parts) == 1:
dogstreams.append(Dogstream.init(logger, log_path=parts[0]))
# If the dogstream includes a wildcard, we'll add every
# matching path.
for path in cls._get_dogstream_log_paths(parts[0]):
dogstreams.append(Dogstream.init(logger, log_path=path))
elif len(parts) == 2:
logger.warn("Invalid dogstream: %s" % ':'.join(parts))
elif len(parts) >= 3:
Expand All @@ -63,14 +84,54 @@ def init(cls, logger, config):
config=config))
except Exception:
logger.exception("Cannot build dogstream")
return dogstreams

logger.info("Dogstream parsers: %s" % repr(dogstreams))
@classmethod
def _load_dogstreams_from_dir(cls, logger, config):
dogstreamsd_path = config.get('additional_dogstreamsd', None)
if not dogstreamsd_path:
return []

return cls(logger, dogstreams)
dogstreams = []
dogstream_yamls = glob.glob(os.path.join(dogstreamsd_path, "*.yaml"))
for dogstream_yaml in dogstream_yamls:
parsed = yaml.load(open(dogstream_yaml).read(), Loader=yLoader)
dogstreams += cls._dogstream_yaml_to_instance(logger, config, parsed)
return dogstreams

def __init__(self, logger, dogstreams):
self.logger = logger
self.dogstreams = dogstreams
@classmethod
def _dogstream_yaml_to_instance(cls, logger, config, parsed):
if 'dogstreams' not in parsed:
return []

dogstreams = []
for stream in parsed['dogstreams']:
# There is only one key in a stream, and it is always its name.
stream_name = stream.keys()[0]
conf = stream[stream_name]['conf']
if 'path' not in conf:
logger.error('No path section for dogstream: %s' % stream_name)
continue
stream_path = conf['path']
parser_path = conf.get('parser', None)
for log_path in cls._get_dogstream_log_paths(stream_path):
dogstream = Dogstream.init(
logger,
log_path=log_path,
parser_spec=parser_path,
parser_args=None,
config=config)
dogstreams.append(dogstream)
return dogstreams

@classmethod
def _get_dogstream_log_paths(cls, path):
"""
Paths may include wildcard *'s and ?'s.
"""
if '*' not in path:
return [path]
return glob.glob(path)

def check(self, agentConfig, move_end=True):
if not self.dogstreams:
Expand Down
1 change: 1 addition & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def get_config(parse_args=True, cfg_path=None, options=None):
'version': get_version(),
'watchdog': True,
'additional_checksd': '/etc/dd-agent/checks.d/',
'additional_dogstreamsd': '/etc/dd-agent/dogstreams.d/',
'bind_host': get_default_bind_host(),
'statsd_metric_namespace': None,
'utf8_decoding': False
Expand Down
3 changes: 3 additions & 0 deletions datadog.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ use_mount: no
# metric timestamp value key0=val0 key1=val1 ...
#

# Additional directory to look for Dogstreams
# additional_dogstreamsd: /etc/dd-agent/dogstreams.d/

# ========================================================================== #
# Custom Emitters #
# ========================================================================== #
Expand Down
5 changes: 5 additions & 0 deletions dogstreams.d/nginx.yaml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
dogstreams:
- nginx:
conf:
path: /var/log/nginx/*
parser: /path/to/my/parsers_module.py:custom_parser
12 changes: 9 additions & 3 deletions packaging/datadog-agent/win32/build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ cp ..\..\..\checks.d\* install_files\checks.d
mkdir install_files\conf.d
cp ..\..\..\conf.d\* install_files\conf.d

# Copy the dogstreams.d files into the install_files
mkdir install_files\dogstreams.d
cp ..\..\..\dogstreams.d\* install_files\dogstreams.d

# Copy JMX Fetch into the install_files
cp -R ..\..\..\dist\jmxfetch install_files\files\jmxfetch

Expand All @@ -50,10 +54,11 @@ cp ..\..\..\win32\status.html install_files\files
heat dir install_files\files -gg -dr INSTALLDIR -t wix\files.xslt -var var.InstallFilesBins -cg files -o wix\files.wxs
heat dir install_files\checks.d -gg -dr INSTALLDIR -var var.InstallFilesChecksD -cg checks.d -o wix\checksd.wxs
heat dir install_files\conf.d -gg -dr APPLIDATIONDATADIRECTORY -t wix\confd.xslt -var var.InstallFilesConfD -cg conf.d -o wix\confd.wxs
heat dir install_files\conf.d -gg -dr APPLIDATIONDATADIRECTORY -t wix\confd.xslt -var var.InstallFilesConfD -cg dogstreams.d -o wix\dogstreamsd.wxs

# Create .wixobj files from agent.wxs, confd.wxs, checksd.wxs
$opts = '-dInstallFiles=install_files', '-dWixRoot=wix', '-dInstallFilesChecksD=install_files\checks.d', '-dInstallFilesConfD=install_files\conf.d', '-dInstallFilesBins=install_files\files', "-dAgentVersion=$version"
candle $opts wix\agent.wxs wix\checksd.wxs wix\confd.wxs wix\files.wxs -ext WixUIExtension -ext WixUtilExtension
# Create .wixobj files from agent.wxs, confd.wxs, checksd.wxs, dogstreamsd.wxs
$opts = '-dInstallFiles=install_files', '-dWixRoot=wix', '-dInstallFilesChecksD=install_files\checks.d', '-dInstallFilesConfD=install_files\conf.d', '-dInstallFilesDogstreamsD=install_files\dogstreams.d', '-dInstallFilesBins=install_files\files', "-dAgentVersion=$version"
candle $opts wix\agent.wxs wix\checksd.wxs wix\confd.wxs wix\dogstreamsd.wxs wix\files.wxs -ext WixUIExtension -ext WixUtilExtension

# Light to create the msi
light agent.wixobj checksd.wixobj confd.wixobj files.wixobj -o ..\..\..\build\ddagent.msi -ext WixUIExtension -ext WixUtilExtension
Expand All @@ -66,6 +71,7 @@ rm -r install_files\files\gohai
rm install_files\files\*.*
rm -r install_files\conf.d
rm -r install_files\checks.d
rm -r install_files\dogstreams.d
rm -r install_files\Microsoft.VC90.CRT


Expand Down
108 changes: 96 additions & 12 deletions tests/test_datadog.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import logging
import unittest
from tempfile import NamedTemporaryFile
from tempfile import NamedTemporaryFile, gettempdir, mkdtemp
import re
import os

from checks.datadog import Dogstreams, EventDefaults, point_sorter
from checks.datadog import Dogstreams, EventDefaults
from util import yLoader

import yaml

log = logging.getLogger('datadog.test')

Expand Down Expand Up @@ -55,7 +58,7 @@ def parse_line(self, line):
"RECOVERY": "success"
}
def parse_events(logger, line):
""" Expecting lines like this:
""" Expecting lines like this:
2012-05-14 12:46:01 [ERROR] - host0 is down (broke its collarbone)
"""
match = log_event_pattern.match(line)
Expand All @@ -78,7 +81,7 @@ class TailTestCase(unittest.TestCase):
def setUp(self):
self.log_file = NamedTemporaryFile()
self.logger = logging.getLogger('test.dogstream')

def _write_log(self, log_data):
for data in log_data:
print >> self.log_file, data
Expand All @@ -101,7 +104,7 @@ def setUp(self):
log.info("Test config: %s" % self.config)
self.dogstream = Dogstreams.init(self.logger, self.config)
self.maxDiff = None

def test_dogstream_gauge(self):
log_data = [
# bucket 0
Expand All @@ -116,21 +119,21 @@ def test_dogstream_gauge(self):
('test.metric.a', '1000000006', '7', 'metric_type=gauge'),
('test.metric.a', '1000000007', '8', 'metric_type=gauge'),
]

expected_output = {
"dogstream": [
('test.metric.a', 1000000000, 5.0, self.gauge),
('test.metric.a', 1000000005, 8.0, self.gauge),
]
}

self._write_log((' '.join(data) for data in log_data))

actual_output = self.dogstream.check(self.config, move_end=False)
self.assertEquals(expected_output, actual_output)
for metric, timestamp, val, attr in expected_output['dogstream']:
assert isinstance(val, float)

def test_dogstream_counter(self):
log_data = [
# bucket 0
Expand All @@ -145,14 +148,14 @@ def test_dogstream_counter(self):
('test.metric.a', '1000000006', '7', 'metric_type=counter'),
('test.metric.a', '1000000007', '8', 'metric_type=counter'),
]

expected_output = {
"dogstream": [
('test.metric.a', 1000000000, 42, self.counter),
('test.metric.a', 1000000005, 27, self.counter),
]
}

self._write_log((' '.join(data) for data in log_data))

actual_output = self.dogstream.check(self.config, move_end=False)
Expand All @@ -170,9 +173,9 @@ def test_dogstream_bad_input(self):
expected_output = {"dogstream":
[('test_metric.e', 1000000000, 10, self.gauge)]
}

self._write_log(log_data)

actual_output = self.dogstream.check(self.config, move_end=False)
self.assertEquals(expected_output, actual_output)

Expand Down Expand Up @@ -438,6 +441,87 @@ def test_supervisord_parser(self):
actual_output = dogstream.check(self.config, move_end=False)
self.assertEquals(expected_output, actual_output)


class TestDogstreamLoading(TailTestCase):

def setUp(self):
TailTestCase.setUp(self)

self.dogstreamd_dir = mkdtemp()
self.config = {
'dogstreams': self.log_file.name,
'additional_dogstreamsd': self.dogstreamd_dir,
}

self.fictional_log = NamedTemporaryFile(
dir=self.dogstreamd_dir, suffix='.log', delete=False)
example_parser = NamedTemporaryFile(
dir=self.dogstreamd_dir, suffix='.py', delete=False)
example_parser.write(self._get_sample_dogstreamd_parser())
example_parser.close()
self.example_parser = example_parser

example_stream = NamedTemporaryFile(
dir=self.dogstreamd_dir, suffix='.yaml', delete=False)
example_stream.write(
self._get_sample_dogstreamd_yaml_string(example_parser.name))
example_stream.close()
self.example_stream = example_stream

def _get_sample_dogstreamd_parser(self):
return "def example_parser(logger, line): return 'Yay'"

def _get_sample_dogstreamd_yaml_string(self, parser_module_path):
return (
"""
dogstreams:
- nginx:
conf:
path: {fake_log_path}
parser: {fake_parser_module}:example_parser
""".format(
fake_log_path=os.path.join(self.dogstreamd_dir, "*.log"),
fake_parser_module=parser_module_path,
))

def test_dogstream_log_path_globbing(self):
"""Make sure that globbed dogstream logfile matching works."""
# Create a tmpfile to serve as a prefix for the other temporary
# files we'll be globbing.
first_tmpfile = NamedTemporaryFile()
tmp_fprefix = os.path.basename(first_tmpfile.name)
all_tmp_filenames = set([first_tmpfile.name])
# We stick the file objects in here to avoid garbage collection (and
# tmpfile deletion). Not sure why this was happening, but it's working
# with this hack in.
avoid_gc = []
for i in range(3):
new_tmpfile = NamedTemporaryFile(prefix=tmp_fprefix)
all_tmp_filenames.add(new_tmpfile.name)
avoid_gc.append(new_tmpfile)
dogstream_glob = os.path.join(gettempdir(), tmp_fprefix + '*')
paths = Dogstreams._get_dogstream_log_paths(dogstream_glob)
self.assertEqual(set(paths), all_tmp_filenames)

def test_dogstream_dir_loading(self):
"""Tests loading a directory full of YAML dogstreams."""
dogstreams = Dogstreams._load_dogstreams_from_dir(self.logger, self.config)
self.assertEqual(len(dogstreams), 1)

def test_dogstream_yaml_to_instance(self):
"""Tests the parsing of a dogstream YAML to a Dogstream instance."""
# Generate a test dogstream YAML config.
example_yaml = self._get_sample_dogstreamd_yaml_string(self.example_parser.name)
parsed = yaml.load(example_yaml, Loader=yLoader)
dogstreams = Dogstreams._dogstream_yaml_to_instance(
self.logger, self.config, parsed)
# Our example config was nginx.
nginx_s = dogstreams[0]
self.assertEqual(len(dogstreams), 1)
self.assertEqual(nginx_s.log_path, self.fictional_log.name)
self.assertEqual(nginx_s.parse_func.__name__, 'example_parser')


if __name__ == '__main__':
logging.basicConfig(format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d %(message)s")
unittest.main()