Skip to content

Commit

Permalink
[CrateDB] Add basic data acquisition support for CrateDB
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jun 9, 2023
1 parent 5950201 commit f774a90
Show file tree
Hide file tree
Showing 15 changed files with 497 additions and 10 deletions.
5 changes: 5 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ MOSQUITTO_VERSION=2.0
MOSQUITTO_MQTT_PORT=1883
MOSQUITTO_WS_PORT=9001

# CrateDB
CRATEDB_VERSION=latest
CRATEDB_HTTP_PORT=4200
CRATEDB_POSTGRESQL_PORT=5432

# InfluxDB
INFLUXDB_VERSION=1.8
INFLUXDB_HTTP_PORT=8086
Expand Down
9 changes: 8 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@ jobs:
os: [ ubuntu-20.04 ] # , macos-latest, windows-latest ]
python-version: [ "3.7", "3.8", "3.9", "3.10", "3.11" ]
mosquitto-version: [ "2.0" ]
cratedb-version: [ "5.3" ]
influxdb-version: [ "1.8" ]
grafana-version: [ "7.5.17", "8.5.15", "9.3.0" ]

# https://docs.github.com/en/free-pro-team@latest/actions/guides/about-service-containers
services:

cratedb:
image: crate:${{ matrix.cratedb-version }}
ports:
- 4200:4200
- 5432:5432

influxdb:
image: influxdb:${{ matrix.influxdb-version }}
ports:
Expand All @@ -53,7 +60,7 @@ jobs:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}

name: Python ${{ matrix.python-version }}, Grafana ${{ matrix.grafana-version }}, Mosquitto ${{ matrix.mosquitto-version }}, InfluxDB ${{ matrix.influxdb-version }}
name: Py ${{ matrix.python-version }}, Grafana ${{ matrix.grafana-version }}, Mosquitto ${{ matrix.mosquitto-version }}, InfluxDB ${{ matrix.influxdb-version }}, CrateDB ${{ matrix.cratedb-version }}
steps:

- name: Acquire sources
Expand Down
3 changes: 3 additions & 0 deletions doc/source/development/tests.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ Run specific tests with maximum verbosity::
# Run tests marked with "tasmota", "homie" or "airrohr".
pytest test ${PYTEST_OPTIONS} -m 'tasmota or homie or airrohr'

# Run tests with CrateDB as database backend.
pytest test ${PYTEST_OPTIONS} -m cratedb

To see available markers, type::

pytest --markers
13 changes: 13 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ services:
- ${PATH_VAR_LIB}/mosquitto:/mosquitto/data
- ${PATH_VAR_LOG}/mosquitto:/mosquitto/log

cratedb:
image: crate:${CRATEDB_VERSION}
ports:
- "${CRATEDB_HTTP_PORT}:${CRATEDB_HTTP_PORT}"
- "${CRATEDB_POSTGRESQL_PORT}:${CRATEDB_POSTGRESQL_PORT}"
environment:
CRATE_HEAP_SIZE: 2g

command: ["crate",
"-Cdiscovery.type=single-node",
"-Ccluster.routing.allocation.disk.threshold_enabled=false",
]

# https://github.com/robcowart/docker_compose_cookbook/blob/master/STACKS/influx_oss/docker-compose.yml#L21
influxdb:
image: influxdb:${INFLUXDB_VERSION}
Expand Down
41 changes: 41 additions & 0 deletions etc/test/cratedb.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
; ######################################
; Kotori test configuration with CrateDB
; ######################################


; =====================
; Connectivity settings
; =====================

; MQTT bus adapter
[mqtt]
host = localhost
#port = 1883
username = kotori
password = kotori

; Storage adapter
[cratedb]
; host = localhost
; port = 4200
; username = crate
; password =

; User interface
[grafana]
host = localhost
#port = 3000
username = admin
password = admin


; ================
; Channel settings
; ================

[mqttkit-2]
enable = true
type = application
realm = mqttkit-2
mqtt_topics = mqttkit-2/#
application = kotori.daq.application.mqttkit:mqttkit_application
1 change: 1 addition & 0 deletions etc/test/main.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ http_listen = localhost
http_port = 24642

; TODO: Implement backend database selection.
; use_database = cratedb
; use_database = influxdb

; mqtt bus adapter
Expand Down
16 changes: 13 additions & 3 deletions kotori/daq/services/mig.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# (c) 2015-2021 Andreas Motl <andreas@getkotori.org>
# (c) 2015-2023 Andreas Motl <andreas@getkotori.org>
import os
import time
import json

Expand All @@ -17,6 +18,7 @@
from kotori.daq.decoder.schema import MessageType, TopicMatchers
from kotori.daq.services import MultiServiceMixin
from kotori.daq.intercom.mqtt import MqttAdapter
from kotori.daq.storage.cratedb import CrateDBAdapter
from kotori.daq.storage.influx import InfluxDBAdapter
from kotori.util.configuration import read_list
from kotori.util.thimble import Thimble
Expand Down Expand Up @@ -79,7 +81,14 @@ def setupService(self):

self.registerService(self.mqtt_service)

self.influx = InfluxDBAdapter(settings = self.settings.influxdb)
# TODO: Support multiple databases at the same time.
log.info("Creating database adapter")
if "influxdb" in self.settings:
self.database = InfluxDBAdapter(settings=self.settings.influxdb)
elif "cratedb" in self.settings:
self.database = CrateDBAdapter(settings = self.settings.cratedb)
else:
log.warn("No time-series database configured")

# Perform MQTT message processing using a different thread pool
self.threadpool = ThreadPool()
Expand Down Expand Up @@ -311,7 +320,8 @@ def store_message(self, storage, data):
:param storage: The storage location object
:param data: The data ready for storing
"""
self.influx.write(storage, data)
if self.database is not None:
self.database.write(storage, data)

def mqtt_process_error(self, failure, topic, payload):
"""
Expand Down
193 changes: 193 additions & 0 deletions kotori/daq/storage/cratedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# -*- coding: utf-8 -*-
# (c) 2023 Andreas Motl <andreas@getkotori.org>
import calendar
import json
from decimal import Decimal
from copy import deepcopy
from datetime import datetime, date

import crate.client.http
import pytz
import requests
from crate import client
from crate.client.exceptions import ProgrammingError
from funcy import project
from twisted.logger import Logger

from kotori.daq.storage.util import format_chunk

log = Logger()


class CrateDBAdapter(object):
"""
Kotori database backend adapter for CrateDB.
CrateDB is a distributed SQL database for storing and analyzing
massive amounts of data in real-time. Built on top of Lucene.
https://github.com/crate/crate
"""

def __init__(self, settings=None, database=None):
"""
Carry over connectivity parameters.
TODO: Verify with CrateDB Cloud.
"""

settings = deepcopy(settings) or {}
settings.setdefault("host", "localhost")
settings.setdefault("port", "4200")
settings.setdefault("username", "crate")
settings.setdefault("password", "")
settings.setdefault("database", database)

# TODO: Bring back pool size configuration.
# settings.setdefault('pool_size', 10)

settings["port"] = int(settings["port"])

# FIXME: This is bad style. Well, but it is currently
# inherited from ~10 year old code, so c'est la vie.
self.__dict__.update(**settings)

# Bookkeeping for all databases having been touched already
self.databases_written_once = set()

self.host_uri = "{host}:{port}".format(**self.__dict__)

# TODO: Bring back pool size configuration.
# log.info('Storage target is {uri}, pool size is {pool_size}', uri=self.host_uri, pool_size=self.pool_size)
log.info("Storage target is {uri}", uri=self.host_uri)
self.db_client = client.connect(
self.host_uri, username=self.username, password=self.password, pool_size=20,
)

def get_tablename(self, meta):
"""
Get table name for SensorWAN channel.
"""
return f"{meta.database}.{meta.measurement}"

def create_table(self, tablename):
"""
Create database table for SensorWAN channel.
"""
log.info(f"Creating table: {tablename}")
sql_ddl = f"""
CREATE TABLE IF NOT EXISTS {tablename} (
time TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,
tags OBJECT(DYNAMIC),
fields OBJECT(DYNAMIC)
);
"""
cursor = self.db_client.cursor()
cursor.execute(sql_ddl)
cursor.close()

def write(self, meta, data):
"""
Format ingress data chunk and store it into database table.
TODO: This dearly needs efficiency improvements. Currently, there is no
batching, just single records/inserts. That yields bad performance.
"""

meta_copy = deepcopy(dict(meta))
data_copy = deepcopy(data)

try:
chunk = format_chunk(meta, data)

except Exception as ex:
log.failure(
"Could not format chunk (ex={ex_name}: {ex}): data={data}, meta={meta}",
ex_name=ex.__class__.__name__,
ex=ex,
meta=meta_copy,
data=data_copy,
)
raise

try:
success = self.write_chunk(meta, chunk)
return success

except requests.exceptions.ConnectionError as ex:
log.failure(
"Problem connecting to CrateDB at {uri}: {ex}", uri=self.host_uri, ex=ex
)
raise

except ProgrammingError as ex:
if "SchemaUnknownException" in ex.message:
db_table = self.get_tablename(meta)
self.create_table(db_table)

# Attempt second write
success = self.write_chunk(meta, chunk)
return success

else:
raise

def write_chunk(self, meta, chunk):
"""
Run the SQL `INSERT` operation.
"""
db_table = self.get_tablename(meta)
cursor = self.db_client.cursor()

# With or without timestamp.
if "time" in chunk:
cursor.execute(
f"INSERT INTO {db_table} (time, tags, fields) VALUES (?, ?, ?)",
(chunk["time"], chunk["tags"], chunk["fields"]),
)
else:
cursor.execute(
f"INSERT INTO {db_table} (tags, fields) VALUES (?, ?)",
(chunk["tags"], chunk["fields"]),
)
success = True
self.databases_written_once.add(meta.database)
cursor.close()
if success:
log.debug("Storage success: {chunk}", chunk=chunk)
else:
log.error("Storage failed: {chunk}", chunk=chunk)
return success

@staticmethod
def get_tags(data):
"""
Derive tags from topology information.
TODO: Verify if this is used at all.
"""
return project(data, ["gateway", "node"])


class TimezoneAwareCrateJsonEncoder(json.JSONEncoder):
epoch_aware = datetime(1970, 1, 1, tzinfo=pytz.UTC)
epoch_naive = datetime(1970, 1, 1)

def default(self, o):
if isinstance(o, Decimal):
return str(o)
if isinstance(o, datetime):
if o.tzinfo:
delta = o - self.epoch_aware
else:
delta = o - self.epoch_naive
return int(delta.microseconds / 1000.0 +
(delta.seconds + delta.days * 24 * 3600) * 1000.0)
if isinstance(o, date):
return calendar.timegm(o.timetuple()) * 1000
return json.JSONEncoder.default(self, o)


# Monkey patch.
# TODO: Submit upstream.
crate.client.http.CrateJsonEncoder = TimezoneAwareCrateJsonEncoder
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ markers =
http: Tests using HTTP.
export: Tests for exporting data.
mqtt: Tests only doing MQTT.
cratedb: Tests specific to CrateDB.
influxdb: Tests specific to InfluxDB.
grafana: Tests interacting with Grafana.
mongodb: Tests using MongoDB.
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@

extras = {
'daq': [
'crash<1',
'crate[sqlalchemy]<1',
'influxdb>=5.3.0,<6',
'pytz>=2020.1',
'requests>=2.12.4,<3',
Expand Down
4 changes: 3 additions & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from kotori import KotoriBootloader
from test.util import boot_kotori, sleep
from test.settings.mqttkit import influx_sensors, influx_events, grafana, device_influx_sensors
from test.settings.mqttkit import cratedb_sensors, influx_sensors, influx_events, grafana, device_influx_sensors

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,6 +48,8 @@ def machinery():


machinery = create_machinery('./etc/test/main.ini')
machinery_cratedb = create_machinery('./etc/test/cratedb.ini')
reset_cratedb = cratedb_sensors.make_reset_measurement()
create_influxdb = influx_sensors.make_create_db()
reset_influxdb = influx_sensors.make_reset_measurement()
reset_grafana = grafana.make_reset()
Expand Down
Loading

0 comments on commit f774a90

Please sign in to comment.