Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CV Manager mongoDB full support #54

Merged
merged 11 commits into from
Apr 1, 2024
Merged
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ For the "Debug Solution" to run properly on Windows 10/11 using WSL, the followi

<b>API Variables</b>

- COUNTS_DB_TYPE: Set to either "MongoDB" or "BigQuery" depending on where the message counts are stored.
- COUNTS_MSG_TYPES: Set to a list of message types to include in counts query. Sample format is described in the sample.env.
- COUNTS_DB_NAME: The BigQuery table or MongoDB collection name where the RSU message counts are located.
- BSM_DB_NAME: The database name for BSM visualization data.
- SSM_DB_NAME: The database name for SSM visualization data.
- SRM_DB_NAME: The database name for SRM visualization data.
Expand Down
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ services:
MONGO_DB_NAME: ${MONGO_DB_NAME}

COUNTS_MSG_TYPES: ${COUNTS_MSG_TYPES}
COUNTS_DB_TYPE: ${COUNTS_DB_TYPE}
COUNTS_DB_NAME: ${COUNTS_DB_NAME}
GOOGLE_APPLICATION_CREDENTIALS: '/google/gcp_credentials.json'

BSM_DB_NAME: ${BSM_DB_NAME}
Expand Down
10 changes: 3 additions & 7 deletions resources/kubernetes/cv-manager-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ spec:
# Fill out the ENV vars with your own values
- name: CORS_DOMAIN
value: ''
- name: GOOGLE_APPLICATION_CREDENTIALS
value: ''
- name: GOOGLE_CLIENT_ID
value: ""
- name: KEYCLOAK_ENDPOINT
value: ""
- name: KEYCLOAK_REALM
Expand All @@ -132,11 +128,11 @@ spec:
secretKeyRef:
name: some-postgres-secret-password
key: some-postgres-secret-key
- name: COUNTS_DB_TYPE
- name: MONGO_DB_URI
value: ""
- name: COUNTS_MSG_TYPES
- name: MONGO_DB_NAME
value: ""
- name: COUNTS_DB_NAME
- name: COUNTS_MSG_TYPES
value: ""
- name: BSM_DB_NAME
value: ""
Expand Down
5 changes: 0 additions & 5 deletions sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ KEYCLOAK_DOMAIN=cvmanager.auth.com
GOOGLE_CLIENT_ID=
GOOGLE_CLIENT_SECRET=

# Set to either "BIGQUERY" or "MONGODB" depending on message count location.
COUNTS_DB_TYPE="BIGQUERY"

# If "BIGQUERY", set the location of the GCP service account key attached as a volume
GOOGLE_APPLICATION_CREDENTIALS='./resources/google/sample_gcp_service_account.json'

Expand All @@ -57,9 +54,7 @@ MONGO_DB_URI=
MONGO_DB_NAME="ODE"

# Set these variables if using either "MONGODB" or "BIGQUERY"
# COUNTS_DB_NAME: Used for V2X message counts
# COUNTS_MSG_TYPES: Comma seperated list of message types.
COUNTS_DB_NAME=
# COUNTS_MSG_TYPES must be set for the counts menu to correctly populate when building an image for deployment
COUNTS_MSG_TYPES='BSM,SSM,SPAT,SRM,MAP'
BSM_DB_NAME=
Expand Down
46 changes: 44 additions & 2 deletions services/addons/images/count_metric/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,48 @@
# CDOT GCP Count Metric Generator
# Count Metric Service

This directory contains the script for monitoring a Pub/Sub topic for incoming messages and counting them in regard to the IP/RSU they originated from. (For an example message format, see the last section of the README) Each record is then stored as a custom metric on GCP Monitoring.
## Daily Emailer (MongoDB)

The count_metric service provides a means of querying and processing the jpo-ode messages stored in mongoDB from your jpo-ode deployment environment. These processed messages are quantified and compiled into a message count summary email that includes a breakdown of all messages received for the past 24 hours. These counts are grouped by RSU that forwarded the data. Both "in" and "out" counts are collected to be able to determine if there has been any data loss during the processing within the jpo-ode. Any deviance greater than 5% will have its 'out' counts marked red in the generated email. Anything below 5% will be marked green.

It is important to note that the count_metric service assumes Map and TIM messages are deduplicated on the 'out' counts. It will normalize the deviance expectation to 1 unique Map or TIM per hour from a RSU.

Specifically includes the following message types: ["BSM", "TIM", "Map", "SPaT", "SRM", "SSM"]

To run this service, the following environment variables must be set:

<b>LOGGING_LEVEL:</b> The logging level of the deployment. Options are: 'critical', 'error', 'warning', 'info' and 'debug'. If not specified, will default to 'info'. Refer to Python's documentation for more info: [Python logging](https://docs.python.org/3/howto/logging.html).

<b>ENABLE_EMAILER:</b> Set to 'True' to run the daily emailer or 'False' to use the now deprecated Kafka message counter. It is recommended to switch to mongoDB if you are still using the message counter in any environments.

<b>DEPLOYMENT_TITLE:</b> The name of the environment that the jpo-ode messages are relevant to. This can be 'DEV', 'PROD', or anything suitable to your jpo-ode deployment.

<b>PG_DB_HOST:</b> The connection information for the Postgres database.

<b>PG_DB_USER:</b> Postgres database username.

<b>PG_DB_PASS:</b> Postgres database password, surround in single quotes if this has any special characters.

<b>PG_DB_NAME:</b> Postgres database name.

<b>MONGO_DB_URI:</b> Connection string uri for the MongoDB database, please refer to the following [documentation](https://www.mongodb.com/docs/manual/reference/connection-string/).

<b>MONGO_DB_NAME:</b> MongoDB database name.

<b>SMTP_SERVER_IP:</b> The IP or domain of the SMTP server your organization uses. DOTs often have a self hosted SMTP server for security reasons.

<b>SMTP_USERNAME:</b> The username for the SMTP server account.

<b>SMTP_PASSWORD:</b> The password for the SMTP server account.

<b>SMTP_EMAIL:</b> The origin email that the count_metric will send the email from. This is usually associated with the SMTP server authentication.

<b>SMTP_EMAIL_RECIPIENTS:</b> Recipient emails, delimited by ','.

## Kafka JPO-ODE Message Counter

### NOTE: The following is now deprecated. It is recommended to use the emailer feature of this addon service with the mongoDB deployment from the [jpo-conflictmonitor](https://github.com/usdot-jpo-ode/jpo-conflictmonitor/blob/develop/docker-compose.yml) repository with the jpo-ode. The old feature will be removed in the next release (Summer 2024).

This directory contains the script for monitoring a Kafka topic for incoming messages and counting them in regard to the IP/RSU they originated from. (For an example message format, see the last section of the README) Each record is then stored as a custom metric on GCP Monitoring.

To run the script, the following environment variables must be set:

Expand Down
168 changes: 168 additions & 0 deletions services/addons/images/count_metric/daily_emailer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import os
import logging
import gen_email
from common.emailSender import EmailSender
import common.pgquery as pgquery
from datetime import datetime, timedelta
from pymongo import MongoClient

message_types = ["BSM", "TIM", "Map", "SPaT", "SRM", "SSM"]


# Modify the rsu_dict with the specified date range's mongoDB "in" counts for each message type
# The rsu_dict is modified in place
def query_mongo_in_counts(rsu_dict, start_dt, end_dt, mongo_db):
for type in message_types:
collection = mongo_db[f"OdeRawEncoded{type.upper()}Json"]
# Perform mongoDB aggregate query
agg_result = collection.aggregate(
[
{
"$match": {
"recordGeneratedAt": {
"$gte": start_dt,
"$lt": end_dt,
}
}
},
{
"$group": {
"_id": f"${type.title()}MessageContent.metadata.originRsu",
"count": {"$sum": 1},
}
},
]
)
for record in agg_result:
if not record["_id"]:
continue
rsu_ip = record["_id"][0]
count = record["count"]

logging.debug(f"{type.title()} In count received for {rsu_ip}: {count}")

# If a RSU that is not in PostgreSQL has counts recorded, add it to the rsu_dict and populate zeroes
if rsu_ip not in rsu_dict:
rsu_dict[rsu_ip] = {
"primary_route": "Unknown",
"counts": {},
}
for t in message_types:
rsu_dict[rsu_ip]["counts"][t] = {"in": 0, "out": 0}

rsu_dict[rsu_ip]["counts"][type]["in"] = count


# Modify the rsu_dict with the specified date range's mongoDB "out" counts for each message type
# The rsu_dict is modified in place
def query_mongo_out_counts(rsu_dict, start_dt, end_dt, mongo_db):
for type in message_types:
collection = mongo_db[f"Ode{type.title()}Json"]
# Perform mongoDB aggregate query
agg_result = collection.aggregate(
[
{
"$match": {
"recordGeneratedAt": {
"$gte": start_dt,
"$lt": end_dt,
}
}
},
{
"$group": {
"_id": f"$metadata.originIp",
"count": {"$sum": 1},
}
},
]
)
for record in agg_result:
if not record["_id"]:
continue
rsu_ip = record["_id"]
count = record["count"]

logging.debug(f"{type.title()} Out count received for {rsu_ip}: {count}")

# If a RSU that is not in PostgreSQL has counts recorded, add it to the rsu_dict and populate zeroes
if rsu_ip not in rsu_dict:
rsu_dict[rsu_ip] = {
"primary_route": "Unknown",
"counts": {},
}
for t in message_types:
rsu_dict[rsu_ip]["counts"][t] = {"in": 0, "out": 0}

rsu_dict[rsu_ip]["counts"][type]["out"] = count


def prepare_rsu_dict():
query = (
"SELECT to_jsonb(row) "
"FROM ("
"SELECT ipv4_address, primary_route "
"FROM public.rsus "
"ORDER BY primary_route ASC, milepost ASC"
") as row"
)

# Query PostgreSQL for the list of SNMP message forwarding configurations tracked in PostgreSQL
data = pgquery.query_db(query)

rsu_dict = {}
for row in data:
row = dict(row[0])
rsu_dict[row["ipv4_address"]] = {
"primary_route": row["primary_route"],
"counts": {},
}
for type in message_types:
rsu_dict[row["ipv4_address"]]["counts"][type] = {"in": 0, "out": 0}
logging.debug(f"Created RSU dictionary: {rsu_dict}")

return rsu_dict


def email_daily_counts(email_body):
logging.info("Attempting to send the count emails...")
try:
email_addresses = os.environ["SMTP_EMAIL_RECIPIENTS"].split(",")

for email_address in email_addresses:
emailSender = EmailSender(
os.environ["SMTP_SERVER_IP"],
587,
)
emailSender.send(
sender=os.environ["SMTP_EMAIL"],
recipient=email_address,
subject=f"{str(os.environ['DEPLOYMENT_TITLE']).upper()} Counts",
message=email_body,
replyEmail="",
username=os.environ["SMTP_USERNAME"],
password=os.environ["SMTP_PASSWORD"],
pretty=True,
)
except Exception as e:
logging.error(e)


def run_daily_emailer():
rsu_dict = prepare_rsu_dict()

# Grab today's date and yesterday's date for a 24 hour range
start_dt = (datetime.now() - timedelta(1)).replace(
hour=0, minute=0, second=0, microsecond=0
)
end_dt = (datetime.now()).replace(hour=0, minute=0, second=0, microsecond=0)

client = MongoClient(os.getenv("MONGO_DB_URI"))
mongo_db = client[os.getenv("MONGO_DB_NAME")]
# Populate rsu_dict with counts from mongoDB
query_mongo_in_counts(rsu_dict, start_dt, end_dt, mongo_db)
query_mongo_out_counts(rsu_dict, start_dt, end_dt, mongo_db)

# Generate the email content with the populated rsu_dict
email_body = gen_email.generate_email_body(rsu_dict, start_dt, end_dt)
email_daily_counts(email_body)
12 changes: 0 additions & 12 deletions services/addons/images/count_metric/docker-compose.yml

This file was deleted.

29 changes: 27 additions & 2 deletions services/addons/images/count_metric/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import copy
import threading
import logging
import daily_emailer
import common.pgquery as pgquery

from apscheduler.schedulers.background import BackgroundScheduler
from kafka_counter import KafkaMessageCounter

# Set based on project and subscription, set these outside of the script if deployed
Expand Down Expand Up @@ -46,7 +48,7 @@ def populateRsuDict():
rsu_count_dict["Unknown"] = {}


def run():
def run_counter():
# Pull list of message types to run counts for from environment variable
messageTypesString = os.getenv("MESSAGE_TYPES", "")
if messageTypesString == "":
Expand Down Expand Up @@ -98,5 +100,28 @@ def run():
logging.debug("Closed thread")


def init_background_daily_emailer_task():
logging.info("Initiating daily counts emailer background task scheduler...")
# Run scheduler for async daily counts emails
scheduler = BackgroundScheduler({"apscheduler.timezone": "UTC"})
scheduler.add_job(daily_emailer.run_daily_emailer, "cron", hour=7)
scheduler.start()


if __name__ == "__main__":
run()
log_level = (
"INFO" if "LOGGING_LEVEL" not in os.environ else os.environ["LOGGING_LEVEL"]
)
logging.basicConfig(format="%(levelname)s:%(message)s", level=log_level)

logging.info("driver started")
# Emailer does not work with counts generated from this service, only with the ODE counts in mongoDB
if os.environ["ENABLE_EMAILER"].lower() == "true":
init_background_daily_emailer_task()
# Keeps the script going so it can run as a Dockerized container.
# Yes, this sucks. We will switch this all to cron once we can remove the deprecated counter
while True:
continue
# This counter is deprecated and will be losing support in the next release. Use the ODE with mongoDB
else:
run_counter()
Loading
Loading