Skip to content

Commit

Permalink
[CrateDB] Add support for Grafana instant dashboards
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jun 10, 2023
1 parent d68974a commit 8abe55d
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 24 deletions.
25 changes: 22 additions & 3 deletions kotori/daq/graphing/grafana/dashboard.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# (c) 2015-2021 Andreas Motl, <andreas@getkotori.org>
# (c) 2015-2023 Andreas Motl, <andreas@getkotori.org>
import os
import json

Expand All @@ -10,13 +10,17 @@
from twisted.logger import Logger
from pyramid.settings import asbool

from kotori.daq.model import TimeseriesDatabaseType

log = Logger()

@attr.s
class GrafanaDashboardModel(object):
name = attr.ib()
title = attr.ib()
datasource = attr.ib()
database_type: TimeseriesDatabaseType = attr.ib()
database_name = attr.ib()
measurement_sensors = attr.ib()
measurement_events = attr.ib()
uid = attr.ib(default=None)
Expand Down Expand Up @@ -49,6 +53,7 @@ def make(self, data=None):
# Wrap everything into convenience object
dashboard = GrafanaDashboard(
channel=self.channel,
model=self.model,
uid=dashboard_uid,
title=dashboard_title,
datasource=datasource,
Expand Down Expand Up @@ -302,8 +307,9 @@ def use_field(field_name: str):

class GrafanaDashboard(object):

def __init__(self, channel=None, uid=None, title='default', datasource='default', folder_id=None, dashboard_data=None):
def __init__(self, channel=None, model=None, uid=None, title='default', datasource='default', folder_id=None, dashboard_data=None):
self.channel = channel or Munch()
self.model: GrafanaDashboardModel = model
self.dashboard_uid = uid
self.dashboard_title = title
self.datasource = datasource
Expand Down Expand Up @@ -351,11 +357,17 @@ def __init__(self, channel=None, uid=None, title='default', datasource='default'
if panel_ids:
self.panel_id = max(panel_ids)

if self.model.database_type is TimeseriesDatabaseType.CRATEDB:
target_template = 'grafana-target-cratedb.json'
elif self.model.database_type is TimeseriesDatabaseType.INFLUXDB1:
target_template = 'grafana-target-influxdb1.json'
else:
raise ValueError(f"Unknown database type: {self.model.database_type}")

self.tpl_dashboard = self.get_template('grafana-dashboard.json')
self.tpl_annotation = self.get_template('grafana-annotation.json')
self.tpl_panel = self.get_template('grafana-panel.json')
self.tpl_target = self.get_template('grafana-target.json')
self.tpl_target = self.get_template(target_template)

def get_template(self, filename):
filename = os.path.join('resources', filename)
Expand Down Expand Up @@ -450,8 +462,15 @@ def build_panel(self, panel, measurement):
log.failure(u'Failed building valid JSON for Grafana panel. data={data}, json={json}',
data=data_panel, json=panel_json)

def get_tablename(self):
"""
Produce full-qualified table name, like `<database>.<table>`, or `<schema>.<table>`.
"""
return f"{self.model.database_name}.{self.model.measurement_sensors}"

def get_target(self, panel, measurement, fieldname):
data_target = {
'table': self.get_tablename(),
'measurement': measurement,
'name': fieldname,
'alias': fieldname,
Expand Down
47 changes: 38 additions & 9 deletions kotori/daq/graphing/grafana/manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# -*- coding: utf-8 -*-
# (c) 2015-2021 Andreas Motl, <andreas@getkotori.org>
# (c) 2015-2023 Andreas Motl, <andreas@getkotori.org>
import arrow
from twisted.logger import Logger
from twisted.application.service import MultiService

from kotori.daq.model import TimeseriesDatabaseType
from kotori.daq.services import MultiServiceMixin
from kotori.daq.graphing.grafana.api import GrafanaApi
from kotori.daq.graphing.grafana.dashboard import GrafanaDashboardBuilder, GrafanaDashboardModel
Expand All @@ -25,6 +26,13 @@ def __init__(self, settings=None, channel=None):
# Shortcut to global settings
self.config = settings

if "cratedb" in self.config:
self.dbtype = TimeseriesDatabaseType.CRATEDB
elif "influxdb" in self.config:
self.dbtype = TimeseriesDatabaseType.INFLUXDB1
else:
raise ValueError("Timeseries database type not defined")

if not 'port' in self.config['grafana']:
self.config['grafana']['port'] = '3000'

Expand Down Expand Up @@ -76,15 +84,34 @@ def create_datasource(self, storage_location):

datasource_name = storage_location.database

self.grafana_api.create_datasource(datasource_name, {
"type": "influxdb",
"url": "http://{host}:{port}/".format(
host=self.config['influxdb']['host'],
port=int(self.config['influxdb'].get('port', '8086'))),
"database": storage_location.database,
"user": self.config['influxdb']['username'],
"password": self.config['influxdb']['password'],
if self.dbtype is TimeseriesDatabaseType.CRATEDB:
db_config = self.config['cratedb']
self.grafana_api.create_datasource(datasource_name, {
"type": "postgres",
"url": "{host}:{port}".format(
host=db_config.get('host', 'localhost'),
port=int(db_config.get('port', '5432'))),
"database": storage_location.database,
"user": db_config.get('username', 'crate'),
"password": db_config.get('password'),
"jsonData": {
"sslmode": "disable",
"postgresVersion": 1400,
},
})
elif self.dbtype is TimeseriesDatabaseType.INFLUXDB1:
self.grafana_api.create_datasource(datasource_name, {
"type": "influxdb",
"url": "http://{host}:{port}/".format(
host=self.config['influxdb']['host'],
port=int(self.config['influxdb'].get('port', '8086'))),
"database": storage_location.database,
"user": self.config['influxdb']['username'],
"password": self.config['influxdb']['password'],
})
else:
log.warn("No time-series database enabled, skipping Grafana provisioning")


return datasource_name

Expand Down Expand Up @@ -145,6 +172,8 @@ def provision(self, storage_location, data, topology=None):
name=dashboard_identity.name,
title=dashboard_identity.title,
datasource=datasource_name,
database_type=self.dbtype,
database_name=storage_location.database,
measurement_sensors=storage_location.measurement,
measurement_events=storage_location.measurement_events
)
Expand Down
10 changes: 10 additions & 0 deletions kotori/daq/graphing/grafana/resources/grafana-target-cratedb.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"alias": "{{ alias }}",
"format": "table",
"resultFormat": "time_series",
"tags": {{ tags }},
"groupByTags": [],
"measurement": "{{ measurement }}",
"rawQuery": true,
"rawSql": "SELECT time, fields['{{ name }}'] AS {{ alias }} FROM {{ table }} WHERE $__timeFilter(time)"
}
8 changes: 8 additions & 0 deletions kotori/daq/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
# (c) 2023 Andreas Motl, <andreas@getkotori.org>
from enum import Enum


class TimeseriesDatabaseType(Enum):
CRATEDB = "cratedb"
INFLUXDB1 = "influxdb"
4 changes: 3 additions & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest_twisted

from kotori import KotoriBootloader
from kotori.daq.model import TimeseriesDatabaseType
from test.util import boot_kotori, sleep
from test.settings.mqttkit import cratedb_sensors, influx_sensors, influx_events, grafana, device_influx_sensors

Expand Down Expand Up @@ -52,7 +53,8 @@ def machinery():
reset_cratedb = cratedb_sensors.make_reset_measurement()
create_influxdb = influx_sensors.make_create_db()
reset_influxdb = influx_sensors.make_reset_measurement()
reset_grafana = grafana.make_reset()
reset_grafana = grafana.make_reset(dbtype=TimeseriesDatabaseType.INFLUXDB1)
reset_grafana_cratedb = grafana.make_reset(dbtype=TimeseriesDatabaseType.CRATEDB)
reset_influxdb_events = influx_events.make_reset_measurement()

device_create_influxdb = device_influx_sensors.make_create_db()
Expand Down
1 change: 1 addition & 0 deletions test/settings/mqttkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TestSettings:
cratedb_measurement_sensors = 'foo_bar_sensors'
cratedb_measurement_events = 'foo_bar_events'
mqtt2_topic_json = 'mqttkit-2/itest/foo/bar/data.json'
grafana2_dashboards = ['mqttkit-2-itest', 'mqttkit-2-itest3']

# InfluxDB settings.
influx_database = 'mqttkit_1_itest'
Expand Down
44 changes: 40 additions & 4 deletions test/test_daq_grafana.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# (c) 2020-2021 Andreas Motl <andreas@getkotori.org>
# (c) 2020-2023 Andreas Motl <andreas@getkotori.org>
import logging

import pytest
Expand All @@ -13,10 +13,11 @@

@pytest_twisted.inlineCallbacks
@pytest.mark.grafana
def test_mqtt_to_grafana_single(machinery, create_influxdb, reset_influxdb, reset_grafana):
@pytest.mark.influxdb
def test_mqtt_influxdb_grafana_single(machinery, create_influxdb, reset_influxdb, reset_grafana):
"""
Publish single reading in JSON format to MQTT broker and proof
that a corresponding datasource and a dashboard was created in Grafana.
Publish single reading in JSON format to MQTT broker and proof that a
corresponding InfluxDB datasource and a dashboard was created in Grafana.
"""

# Submit a single measurement, without timestamp.
Expand Down Expand Up @@ -45,6 +46,41 @@ def test_mqtt_to_grafana_single(machinery, create_influxdb, reset_influxdb, rese
assert 'temperature' in target['query'] or 'humidity' in target['query']


@pytest_twisted.inlineCallbacks
@pytest.mark.grafana
@pytest.mark.cratedb
def test_mqtt_cratedb_grafana_single(machinery_cratedb, reset_cratedb, reset_grafana_cratedb):
"""
Publish single reading in JSON format to MQTT broker and proof that a
corresponding CrateDB datasource and a dashboard was created in Grafana.
"""

# Submit a single measurement, without timestamp.
data = {
'temperature': 42.84,
'humidity': 83.1,
}
yield mqtt_json_sensor(settings.mqtt2_topic_json, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)
yield sleep(PROCESS_DELAY_MQTT)
yield sleep(PROCESS_DELAY_MQTT)

# Proof that Grafana is well provisioned.
logger.info('Grafana: Checking datasource')
assert settings.cratedb_database in grafana.get_datasource_names()

logger.info('Grafana: Retrieving dashboard')
dashboard_name = settings.grafana2_dashboards[0]
dashboard = grafana.get_dashboard_by_name(dashboard_name)

logger.info('Grafana: Checking dashboard layout')
target = dashboard['rows'][0]['panels'][0]['targets'][0]
assert target['measurement'] == settings.cratedb_measurement_sensors
assert target['rawSql'] == "SELECT time, fields['humidity'] AS humidity FROM mqttkit_2_itest.foo_bar_sensors WHERE $__timeFilter(time)"


@pytest_twisted.inlineCallbacks
@pytest.mark.grafana
def test_mqtt_to_grafana_update_panel(machinery, create_influxdb, reset_influxdb, reset_grafana):
Expand Down
25 changes: 18 additions & 7 deletions test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import kotori
from kotori.daq.graphing.grafana.manager import GrafanaManager
from kotori.daq.model import TimeseriesDatabaseType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -90,10 +91,19 @@ def get_field_names(self, dashboard_name, panel_index):
field_names = sorted(map(lambda x: x["fields"][0]["name"], panels[panel_index]['targets']))
return field_names

def make_reset(self):
def make_reset(self, dbtype: TimeseriesDatabaseType = TimeseriesDatabaseType.INFLUXDB1):

if dbtype is TimeseriesDatabaseType.CRATEDB:
database = self.settings.cratedb_database
databases = getattr(self.settings, "cratedb_databases", [])
dashboards = self.settings.grafana2_dashboards
elif dbtype is TimeseriesDatabaseType.INFLUXDB1:
database = self.settings.influx_database
databases = getattr(self.settings, "influx_databases", [])
dashboards = self.settings.grafana_dashboards

@pytest.fixture(scope="function")
def reset_grafana(machinery):
def resetfun(machinery, machinery_cratedb):
"""
Fixture to delete the Grafana datasource and dashboard.
"""
Expand All @@ -103,13 +113,12 @@ def reset_grafana(machinery):
for datasource in self.client.datasources.get():
datasource_name = datasource['name']
logger.info(f"Attempt to delete datasource {datasource_name}")
if datasource_name == self.settings.influx_database or \
datasource_name in getattr(self.settings, "influx_databases", []):
if datasource_name == database or datasource_name in databases:
datasource_id = datasource['id']
self.client.datasources[datasource_id].delete()
logger.info(f"Successfully deleted datasource {datasource_name}")

for dashboard_name in self.settings.grafana_dashboards:
for dashboard_name in dashboards:
logger.info(f"Attempt to delete dashboard {dashboard_name}")
try:
dashboard = self.get_dashboard_by_name(dashboard_name)
Expand All @@ -121,14 +130,16 @@ def reset_grafana(machinery):
raise

# Find all `GrafanaManager` service instances and invoke `KeyCache.reset()` on them.
if machinery:
for machinery in [machinery, machinery_cratedb]:
if machinery is None:
continue
for app in machinery.applications:
for service in app.services:
for subservice in service.services:
if isinstance(subservice, GrafanaManager):
subservice.keycache.reset()

return reset_grafana
return resetfun


class InfluxWrapper:
Expand Down

0 comments on commit 8abe55d

Please sign in to comment.