Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fixed timseries structure for storing signal metric #586 #604

Merged
merged 4 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions openwisp_monitoring/device/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ def write(self, data, time=None, current=False):
self.write_device_metrics = []
for interface in data.get('interfaces', []):
ifname = interface['name']
extra_tags = Metric._sort_dict(device_extra_tags)
if 'mobile' in interface:
self._write_mobile_signal(
interface, ifname, ct, self.device_data.pk, current, time=time
Expand All @@ -98,7 +97,7 @@ def write(self, data, time=None, current=False):
name=name,
key='traffic',
main_tags={'ifname': Metric._makekey(ifname)},
extra_tags=extra_tags,
extra_tags=device_extra_tags,
)
self._append_metric_data(
metric, field_value, current, time=time, extra_values=extra_values
Expand All @@ -119,7 +118,7 @@ def write(self, data, time=None, current=False):
name=name,
key='wifi_clients',
main_tags={'ifname': Metric._makekey(ifname)},
extra_tags=extra_tags,
extra_tags=device_extra_tags,
)
# avoid tsdb overwrite clients
client_time = time
Expand Down Expand Up @@ -172,7 +171,7 @@ def _get_extra_tags(self, device):
tags['location_id'] = str(device_location.location_id)
if device_location.floorplan_id:
tags['floorplan_id'] = str(device_location.floorplan_id)
return tags
return Metric._sort_dict(tags)

def _get_mobile_signal_type(self, signal):
if not signal:
Expand Down Expand Up @@ -212,7 +211,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No
content_type_id=ct.id,
configuration='signal_strength',
name='signal strength',
key=ifname,
key='signal',
main_tags={'ifname': Metric._makekey(ifname)},
)
self._append_metric_data(
metric, signal_strength, current, time=time, extra_values=extra_values
Expand Down Expand Up @@ -240,7 +240,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No
content_type_id=ct.id,
configuration='signal_quality',
name='signal quality',
key=ifname,
key='signal',
main_tags={'ifname': Metric._makekey(ifname)},
)
self._append_metric_data(
metric, signal_quality, current, time=time, extra_values=extra_values
Expand All @@ -253,7 +254,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No
content_type_id=ct.id,
configuration='access_tech',
name='access technology',
key=ifname,
key='signal',
main_tags={'ifname': Metric._makekey(ifname)},
)
self._append_metric_data(
metric,
Expand Down
63 changes: 8 additions & 55 deletions openwisp_monitoring/monitoring/migrations/0005_migrate_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,71 +2,24 @@

import logging

from django.core.exceptions import ObjectDoesNotExist
from django.db import migrations
from swapper import load_model

from .influxdb.influxdb_alter_structure_0006 import EXCLUDED_MEASUREMENTS
from .influxdb.influxdb_alter_structure_0006 import (
update_metric_timeseries_structure_forward_migration,
update_metric_timeseries_structure_reverse_migration,
)

CHUNK_SIZE = 1000

logger = logging.getLogger(__name__)


def forward_migrate_metric(metric_model, configuration, new_key):
metric_qs = metric_model.objects.filter(configuration=configuration).exclude(
key__in=EXCLUDED_MEASUREMENTS
)
updated_metrics = []
for metric in metric_qs.iterator(chunk_size=CHUNK_SIZE):
try:
extra_tags = {'organization_id': str(metric.content_object.organization_id)}
except ObjectDoesNotExist:
extra_tags = {}
metric.main_tags = {
'ifname': metric.key,
}
metric.extra_tags.update(extra_tags)
metric.key = new_key
updated_metrics.append(metric)
if len(updated_metrics) > CHUNK_SIZE:
metric_model.objects.bulk_update(
updated_metrics, fields=['main_tags', 'extra_tags', 'key']
)
updated_metrics = []
if updated_metrics:
metric_model.objects.bulk_update(
updated_metrics, fields=['main_tags', 'extra_tags', 'key']
)


def forward_migration(apps, schema_editor):
Metric = load_model('monitoring', 'Metric')
forward_migrate_metric(Metric, configuration='clients', new_key='wifi_clients')
forward_migrate_metric(Metric, configuration='traffic', new_key='traffic')


def reverse_migration(apps, schema_editor):
# Reverse migration is required because of the
# the unique together condition implemented in
# Metric model.
Metric = load_model('monitoring', 'Metric')
updated_metrics = []
for metric in Metric.objects.filter(key__in=['traffic', 'wifi_clients']).iterator(
chunk_size=CHUNK_SIZE
):
metric.key = metric.main_tags['ifname']
updated_metrics.append(metric)
if len(updated_metrics) > CHUNK_SIZE:
Metric.objects.bulk_update(updated_metrics, fields=['key'])
updated_metrics = []
if updated_metrics:
Metric.objects.bulk_update(updated_metrics, fields=['key'])


class Migration(migrations.Migration):
dependencies = [('monitoring', '0004_metric_main_and_extra_tags')]

operations = [
migrations.RunPython(forward_migration, reverse_code=reverse_migration)
migrations.RunPython(
update_metric_timeseries_structure_forward_migration,
reverse_code=update_metric_timeseries_structure_reverse_migration,
)
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Manually created

import logging

from django.db import migrations

from .influxdb.influxdb_alter_structure_0006 import (
update_metric_timeseries_structure_forward_migration,
update_metric_timeseries_structure_reverse_migration,
)

CHUNK_SIZE = 1000

logger = logging.getLogger(__name__)


def forward_migration(apps, schema_editor):
update_metric_timeseries_structure_forward_migration(apps, schema_editor)
from ..tasks import migrate_timeseries_database

migrate_timeseries_database.delay()


def reverse_migration(apps, schema_editor):
update_metric_timeseries_structure_reverse_migration(
apps, schema_editor, metric_keys=['signal']
)


class Migration(migrations.Migration):
dependencies = [('monitoring', '0011_alter_metric_field_name')]

operations = [
migrations.RunPython(
forward_migration,
reverse_code=reverse_migration,
)
]
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time

import requests
from django.core.exceptions import ObjectDoesNotExist
from influxdb.exceptions import InfluxDBServerError
from swapper import load_model

Expand Down Expand Up @@ -164,10 +165,36 @@ def migrate_wifi_clients():


def migrate_traffic_data():
migrate_influxdb_data(new_measurement='traffic', configuration='traffic')
migrate_influxdb_data(
new_measurement='traffic',
configuration='traffic',
delete_query=f"{DELETE_QUERY} AND access_tech != ''",
)
logger.info('"traffic" measurements successfully migrated.')


def migrate_signal_strength_data():
migrate_influxdb_data(
new_measurement='signal', configuration='signal_strength', delete_query=None
)
logger.info('"signal_strength" measurements successfully migrated.')


def migrate_signal_quality_data():
migrate_influxdb_data(
new_measurement='signal', configuration='signal_quality', delete_query=None
)
logger.info('"signal_quality" measurements successfully migrated.')


def migrate_access_tech_data():
migrate_influxdb_data(
new_measurement='signal',
configuration='access_tech',
)
logger.info('"access_tech" measurements successfully migrated.')


def requires_migration():
"""Indicates whether influxdb data migration is necessary.

Expand All @@ -190,4 +217,77 @@ def migrate_influxdb_structure():
return
migrate_wifi_clients()
migrate_traffic_data()
migrate_signal_strength_data()
migrate_signal_quality_data()
migrate_access_tech_data()
logger.info('Timeseries data migration completed.')


def _forward_migrate_metric(metric_model, configuration, new_key, add_org_tag=True):
metric_qs = metric_model.objects.filter(configuration=configuration).exclude(
key__in=EXCLUDED_MEASUREMENTS
)
updated_metrics = []
for metric in metric_qs.iterator(chunk_size=CHUNK_SIZE):
try:
assert add_org_tag is True
extra_tags = {'organization_id': str(metric.content_object.organization_id)}
except (AssertionError, ObjectDoesNotExist):
extra_tags = {}
metric.main_tags = {
'ifname': metric.key,
}
metric.extra_tags.update(extra_tags)
metric.key = new_key
updated_metrics.append(metric)
if len(updated_metrics) > CHUNK_SIZE:
metric_model.objects.bulk_update(
updated_metrics, fields=['main_tags', 'extra_tags', 'key']
)
updated_metrics = []
if updated_metrics:
metric_model.objects.bulk_update(
updated_metrics, fields=['main_tags', 'extra_tags', 'key']
)


def update_metric_timeseries_structure_forward_migration(apps, schema_editor):
"""
Updates metric objects to use static value for key and set
interface name in the main tags
"""
Metric = load_model('monitoring', 'Metric')
_forward_migrate_metric(Metric, configuration='clients', new_key='wifi_clients')
_forward_migrate_metric(Metric, configuration='traffic', new_key='traffic')
_forward_migrate_metric(
Metric, configuration='signal_strength', new_key='signal', add_org_tag=False
)
_forward_migrate_metric(
Metric, configuration='signal_quality', new_key='signal', add_org_tag=False
)
_forward_migrate_metric(
Metric, configuration='access_tech', new_key='signal', add_org_tag=False
)


def update_metric_timeseries_structure_reverse_migration(
apps, schema_editor, metric_keys=None
):
"""
Reverse migration is required because of the
the unique together condition implemented in
Metric model.
"""
Metric = load_model('monitoring', 'Metric')
metric_keys = metric_keys or ['traffic', 'wifi_clients', 'signal']
updated_metrics = []
for metric in Metric.objects.filter(
key__in=metric_keys,
).iterator(chunk_size=CHUNK_SIZE):
metric.key = metric.main_tags['ifname']
updated_metrics.append(metric)
if len(updated_metrics) > CHUNK_SIZE:
Metric.objects.bulk_update(updated_metrics, fields=['key'])
updated_metrics = []
if updated_metrics:
Metric.objects.bulk_update(updated_metrics, fields=['key'])