Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shard spaces (rebased variant of PR #23) #40

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ In your graphite-api config file::
db: graphite
ssl: false
schema:
- ['', 60]
- ['high-res-metrics', 10]
- ['', [['', 30, '24h'], ['5m_', 300, '30d']]]
- ['high-res-metrics', [['', 10, '6h'], ['60s_', 60, '24h']]]



Expand All @@ -83,7 +83,7 @@ In graphite's ``local_settings.py``::
INFLUXDB_DB = "graphite"
INFLUXDB_SSL = "false"
INFLUXDB_SCHEMA = [
('', 60),
('high-res-metrics', 10)
('', [['', 30, '24h'], ['5m_', 300, '30d']]),
('high-res-metrics', [['', 10, '6h'], ['60s_', 60, '24h']])
]

84 changes: 74 additions & 10 deletions graphite_influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def normalize_config(config=None):
ret['schema'] = cfg.get('schema', [])
ret['log_file'] = cfg.get('log_file', None)
ret['log_level'] = cfg.get('log_level', 'info')
ret['display_metrics'] = cfg.get('display_metrics', '^[^\.]*')
else:
from django.conf import settings
ret['host'] = getattr(settings, 'INFLUXDB_HOST', 'localhost')
Expand All @@ -120,8 +121,54 @@ def normalize_config(config=None):
# Default log level is 'info'
ret['log_level'] = getattr(
settings, 'INFLUXDB_LOG_LEVEL', 'info')
ret['display_metrics'] = getattr(settings, 'INFLUXDB_DISPLAY_METRICS', '^[^\.]*')
return ret

def convert_time(value):
"""
:param value: contains storage scheme info
:return: number of seconds for storage scheme
"""
regex = re.search('([.0-9]+)([smhdwMY])', value)
res = ['', 60]
try:
multiplier = float(regex.group(1))
type = regex.group(2)
except:
return res

if type == 's': # Seconds
res = multiplier
elif type == 'm': # Minutes
res = multiplier * 60
elif type == 'h': # Hours
res = multiplier * 60 * 60
elif type == 'd': # Days
res = multiplier * 60 * 60 * 24
elif type == 'w': # Weeks
res = multiplier * 60 * 60 * 24 * 7
elif type == 'M': # Months. TODO: determine month size
res = multiplier * 60 * 60 * 24 * 31
elif type == 'Y': # Years.
res = multiplier * 60 * 60 * 24 * 365

return int(res)


def determine_step(start_time, steps):
"""
:param start_time: how much data should we have
:param steps: list of available steps
:return: appropriate step
"""
current_time = time.time()
for step in steps:
if current_time - step[2] - 1 <= start_time:
return step

return steps[-1]


class InfluxdbReader(object):
__slots__ = ('client', 'path', 'step', 'cache')

Expand All @@ -137,9 +184,12 @@ def fetch(self, start_time, end_time):
# until is inclusive (until=bar returns data at ts=bar and lower)
# influx doesn't support <= and >= yet, hence the add.
logger.debug(caller="fetch()", start_time=start_time, end_time=end_time, step=self.step, debug_key=self.path)
known_points = []
step = determine_step(start_time, self.step)

with statsd.timer('service=graphite-api.ext_service=influxdb.target_type=gauge.unit=ms.what=query_individual_duration'):
_query = 'select time, value from "%s" where time > %ds and time < %ds order asc' % (
self.path, start_time, end_time + 1)
_query = 'select time, value from "%s%s" where time > %ds and time < %ds order asc' % (
step[0], self.path, start_time, end_time + 1)
logger.debug("Calling influxdb with query - %s" % _query)
data = self.client.query(_query)
logger.debug(caller="fetch()", returned_data=data, debug_key=self.path)
Expand All @@ -149,7 +199,7 @@ def fetch(self, start_time, end_time):
logger.debug(caller="fetch()", msg="COULDN'T READ POINTS. SETTING TO EMPTY LIST", debug_key=self.path)
known_points = []
logger.debug(caller="fetch()", msg="invoking fix_datapoints()", debug_key=self.path)
datapoints = InfluxdbReader.fix_datapoints(known_points, start_time, end_time, self.step, self.path)
datapoints = InfluxdbReader.fix_datapoints(known_points, start_time, end_time, step[1], self.path)

time_info = start_time, end_time, self.step
return time_info, datapoints
Expand Down Expand Up @@ -237,7 +287,7 @@ class InfluxLeafNode(LeafNode):

class InfluxdbFinder(object):
__fetch_multi__ = 'influxdb'
__slots__ = ('client', 'schemas', 'cache')
__slots__ = ('client', 'schemas', 'cache', 'display_metrics')

def __init__(self, config=None):
# Shouldn't be trying imports in __init__.
Expand All @@ -247,7 +297,11 @@ def __init__(self, config=None):
self.cache = _CACHE
config = normalize_config(config)
self.client = InfluxDBClient(config['host'], config['port'], config['user'], config['passw'], config['db'], config['ssl'])
self.schemas = [(re.compile(patt), step) for (patt, step) in config['schema']]
self.schemas = [(
re.compile(name_prefix),
[[prefix, step, convert_time(storage_time)] for [prefix, step, storage_time] in steps]
) for (name_prefix, steps) in config['schema']]
self.display_metrics = config['display_metrics']
self._setup_logger(config['log_level'], config['log_file'])

def _setup_logger(self, level, log_file):
Expand Down Expand Up @@ -298,6 +352,11 @@ def compile_regex(self, query, series=False):
"""Turn wildcard queries into compiled regex
* becomes .*
. becomes \."""
# Special case when user requests all metrics (like getting list for dashboard)
if series and query.pattern == "*":
regex = self.display_metrics
return re.compile(regex)

regex = '^{0}'
if not series:
regex += '$'
Expand Down Expand Up @@ -379,17 +438,22 @@ def find_nodes(self, query):
yield BranchNode(name)

def fetch_multi(self, nodes, start_time, end_time):
series = ', '.join(['"%s"' % node.path for node in nodes])
actual_step = None
for node in nodes:
step = determine_step(start_time, node.reader.step)
if actual_step is None or actual_step[1] < step[1]:
actual_step = step

series = ', '.join(['"%s%s"' % (actual_step[0], node.path) for node in nodes])
# use the step of the node that is the most coarse
# not sure if there's a better way? can we combine series
# with different steps (and use the optimal step for each?)
# probably not
step = max([node.reader.step for node in nodes])
query = 'select time, value from %s where time > %ds and time < %ds order asc' % (
series, start_time, end_time + 1)
logger.debug('fetch_multi - %s', query)
logger.debug('fetch_multi - start_time: %s - end_time: %s, step %s',
print_time(start_time), print_time(end_time), step)
print_time(start_time), print_time(end_time), step[1])

with statsd.timer('service=graphite-api.ext_service=influxdb.target_type=gauge.unit=ms.action=select_datapoints'):
logger.debug("Calling influxdb multi fetch with query - %s", query)
Expand All @@ -403,7 +467,7 @@ def fetch_multi(self, nodes, start_time, end_time):

with statsd.timer('service=graphite-api.action=fix_datapoints_multi.target_type=gauge.unit=ms'):
logger.debug('fetch_multi - action %s', 'invoking fix_datapoints_multi()')
datapoints = InfluxdbReader.fix_datapoints_multi(data, start_time, end_time, step)
datapoints = InfluxdbReader.fix_datapoints_multi(data, start_time, end_time, step[1])

time_info = start_time, end_time, step
time_info = start_time, end_time, step[1]
return time_info, datapoints