Skip to content

Commit

Permalink
Ported data posting for dev envs to protobuf
Browse files Browse the repository at this point in the history
This uses the Protocol Buffer bindings with the definition provided by
the https://github.com/pulp/analytics.pulpproject.org repo.

[noissue]
  • Loading branch information
bmbouter committed Aug 19, 2022
1 parent 9014220 commit a4b8e0b
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[flake8]
exclude = ./docs/*,*/migrations/*
exclude = ./docs/*,*/migrations/*,./pulpcore/app/protobuf/*
ignore = E203,W503,Q000,Q003,D100,D104,D106,D200,D205,D400,D401,D402
max-line-length = 100

Expand Down
Empty file.
67 changes: 67 additions & 0 deletions pulpcore/app/protobuf/telemetry_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 28 additions & 42 deletions pulpcore/app/tasks/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import asyncio
import json
import logging

import aiohttp
import async_timeout

from asgiref.sync import sync_to_async
from google.protobuf.json_format import MessageToJson

from pulpcore.app.apps import pulp_plugin_configs
from pulpcore.app.util import get_telemetry_posting_url
from pulpcore.app.models import SystemID
from pulpcore.app.models.status import ContentAppStatus
from pulpcore.app.models.task import Worker
from pulpcore.app.protobuf.telemetry_pb2 import Telemetry


logger = logging.getLogger(__name__)
Expand All @@ -24,70 +27,53 @@ async def _num_hosts(qs):
return len(hosts)


async def _versions_data():
versions = []

async def _versions_data(telemetry):
for app in pulp_plugin_configs():
versions.append({"component": app.label, "version": app.version})

return {"versions": versions}

new_component = telemetry.components.add()
new_component.name = app.label
new_component.version = app.version

async def _online_content_apps_data():
online_content_apps = ContentAppStatus.objects.online()
online_content_apps_processes = await sync_to_async(online_content_apps.count)()
online_content_apps_hosts = await _num_hosts(online_content_apps)

return {
"online_content_apps": {
"processes": online_content_apps_processes,
"hosts": online_content_apps_hosts,
},
}
async def _online_content_apps_data(telemetry):
online_content_apps_qs = ContentAppStatus.objects.online()
telemetry.online_content_apps.processes = await sync_to_async(online_content_apps_qs.count)()
telemetry.online_content_apps.hosts = await _num_hosts(online_content_apps_qs)


async def _online_workers_data():
online_workers = Worker.objects.online_workers()
online_workers_processes = await sync_to_async(online_workers.count)()
online_workers_hosts = await _num_hosts(online_workers)
async def _online_workers_data(telemetry):
online_workers_qs = Worker.objects.online_workers()
telemetry.online_workers.processes = await sync_to_async(online_workers_qs.count)()
telemetry.online_workers.hosts = await _num_hosts(online_workers_qs)

return {
"online_workers": {
"processes": online_workers_processes,
"hosts": online_workers_hosts,
},
}


async def _system_id():
system_id_entry = await sync_to_async(SystemID.objects.get)()
return {"system_id": str(system_id_entry.pk)}
async def _system_id(telemetry):
system_id_obj = await sync_to_async(SystemID.objects.get)()
telemetry.system_id = str(system_id_obj.pk)


async def post_telemetry():
url = get_telemetry_posting_url()
data = {}

telemetry = Telemetry()

awaitables = (
_system_id(),
_versions_data(),
_online_content_apps_data(),
_online_workers_data(),
_system_id(telemetry),
_versions_data(telemetry),
_online_content_apps_data(telemetry),
_online_workers_data(telemetry),
)

data_iterable_to_merge = await asyncio.gather(*awaitables)
for data_to_merge in data_iterable_to_merge:
data.update(data_to_merge)
await asyncio.gather(*awaitables)

try:
async with aiohttp.ClientSession() as session:
async with async_timeout.timeout(30):
async with session.post(url, json=data) as resp:
async with async_timeout.timeout(300):
async with session.post(url, data=telemetry.SerializeToString()) as resp:
if resp.status == 200:
logger.info(
("Submitted telemetry to %s. " "Information submitted includes %s"),
url,
data,
json.loads(MessageToJson(telemetry)),
)
else:
logger.warning(
Expand Down
4 changes: 2 additions & 2 deletions pulpcore/app/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
# a little cache so viewset_for_model doesn't have iterate over every app every time
_model_viewset_cache = {}

PRODUCTION_URL = "https://analytics-pulpproject-org.pulpproject.workers.dev/"
DEV_URL = "https://dev-analytics-pulpproject-org.pulpproject.workers.dev/"
PRODUCTION_URL = "https://analytics.pulpproject.org/"
DEV_URL = "https://dev.analytics.pulpproject.org/"


def get_url(model):
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ exclude = '''
| dist
| migrations
| docs
| protobuf
)/
'''

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dynaconf~=3.1.9
gunicorn~=20.1.0
jinja2~=3.1
naya~=1.1.1
protobuf<3.21
pygtrie~=2.5.0
psycopg2~=2.9.3
PyYAML>=5.1.1,<6.1.0
Expand Down

0 comments on commit a4b8e0b

Please sign in to comment.