Skip to content

Commit

Permalink
No issue: Correct integration tests on MDFParser component (hashicorp…
Browse files Browse the repository at this point in the history
…#144)

* No issue: Correct integration tests on MDFParser component

In this commit the integration tests of the MDFParser compoenent were correct.
For the tests correction the following was done:
* A refactor was done on the code responsible for loading the service configuration. This was done in order to abstract how the config was loading. Needed for testing

Co-authored-by: Michael Krebs <michael.krebs@bosch.io>
  • Loading branch information
nuno407 and michaelkrebs94 authored Sep 27, 2022
1 parent 07c2595 commit 8fa6f9f
Show file tree
Hide file tree
Showing 18 changed files with 339 additions and 261 deletions.
23 changes: 23 additions & 0 deletions MDFParser/src/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dataclasses import dataclass, fields
import yaml


@dataclass
class MdfParserConfig():
input_queue: str
metadata_output_queue: str

@staticmethod
def load_config_from_yaml_file(path) -> 'MdfParserConfig':
"""Loads yaml file into MdfParserConfig object. Extra yaml fields are ignored.
Args:
path (_type_): path of the yaml file containing the
Returns:
MdfParserConfig: MdfParserConfig object containing passed yaml config
"""
with open(path, 'r') as configfile:
# We should ignore extra fields
field_names = set([f.name for f in fields(MdfParserConfig)])
return MdfParserConfig(**{key: value for key, value in yaml.load(configfile, yaml.SafeLoader).items() if key in field_names})
2 changes: 1 addition & 1 deletion MDFParser/src/downloader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from .s3_interaction import S3Interaction
from mdfparser.s3_interaction import S3Interaction

from baseaws.shared_functions import ContainerServices

Expand Down
131 changes: 67 additions & 64 deletions MDFParser/src/main.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,65 @@
import json
import logging
import os
import re
import os
from dataclasses import dataclass, fields
from typing import Any, TypedDict, cast
import boto3
import yaml

from baseaws.shared_functions import ContainerServices, GracefulExit
from .downloader import Downloader
from .uploader import Uploader
from .synchronizer import Synchronizer
from .processor import Processor
from .chc_counter import ChcCounter
from mdfparser.downloader import Downloader
from mdfparser.uploader import Uploader
from mdfparser.synchronizer import Synchronizer
from mdfparser.processor import Processor
from mdfparser.chc_counter import ChcCounter

CONTAINER_NAME = "MDFParser" # Name of the current container
CONTAINER_VERSION = "v1.0" # Version of the current container

_logger: logging.Logger


def main():
# External configuration that can be configured as kubernetes secret
config: MdfParserConfig
with open(os.environ.get('CONFIG_FILE', '/app/config/config.yml'), 'r') as configfile:
config = cast(MdfParserConfig, yaml.load(configfile, yaml.SafeLoader))
class InvalidFileNameException(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)

_logger.info("Starting Container %s (%s)..\n",
CONTAINER_NAME, CONTAINER_VERSION)

class NoProcessingSuccessfulException(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)


@dataclass
class MdfParserConfig():
input_queue: str
metadata_output_queue: str

@staticmethod
def load_config_from_yaml_file(path) -> 'MdfParserConfig':
"""Loads yaml file into MdfParserConfig object. Extra yaml fields are ignored.
Args:
path (_type_): path of the yaml file containing the
Returns:
MdfParserConfig: MdfParserConfig object containing passed yaml config
"""
with open(path, 'r') as configfile:
# We should ignore extra fields
field_names = set([f.name for f in fields(MdfParserConfig)])
return MdfParserConfig(**{key: value for key, value in yaml.load(configfile, yaml.SafeLoader).items() if key in field_names})


class InputMessage(TypedDict):
_id: str
s3_path: str


def main(config: MdfParserConfig):

_logger.info("Starting Container %s (%s)..\n", CONTAINER_NAME,CONTAINER_VERSION)

# Logic classes
downloader = Downloader()
uploader = Uploader()
Expand All @@ -36,42 +68,34 @@ def main():
processors: list[Processor] = [chc_counter]

# AWS clients for container_services
sqs_client = boto3.client('sqs', region_name='eu-central-1')
container_services = ContainerServices(
container=CONTAINER_NAME, version=CONTAINER_VERSION)
sqs_client = boto3.client('sqs',region_name='eu-central-1')
container_services = ContainerServices(container=CONTAINER_NAME,version=CONTAINER_VERSION)

# allow graceful exit
graceful_exit = GracefulExit()

while(graceful_exit.continue_running):
_logger.debug('Listening to input queue.')
message = container_services.listen_to_input_queue(
sqs_client, config['input_queue'])
message = container_services.listen_to_input_queue(sqs_client, config.input_queue)
if message:
# check message has the required fields
if not('Body' in message and '_id' in message['Body'] and 's3_path' in message['Body']):
_logger.warning('Required fields are not in the message.')
continue

message_body = cast(InputMessage, json.loads(
message['Body'].replace("\'", "\"")))
message_body = cast(InputMessage, json.loads(message['Body'].replace("\'", "\"")))
_logger.debug('Processing recording entry %s', message_body['_id'])
try:
metadata: dict[str, Any] = {
'_id': message_body['_id'],
}
metadata.update(process_request(
message_body['s3_path'], downloader, uploader, synchronizer, processors))
container_services.send_message(
sqs_client, config['metadata_output_queue'], metadata)
container_services.delete_message(
sqs_client, message['ReceiptHandle'], config['input_queue'])
metadata.update(process_request(message_body['s3_path'], downloader, uploader, synchronizer, processors))
container_services.send_message(sqs_client, config.metadata_output_queue, metadata)
container_services.delete_message(sqs_client, message['ReceiptHandle'], config.input_queue)
except Exception:
_logger.exception(
'Error during processing of request for %s', message_body['_id'])

_logger.exception('Error during processing of request for %s', message_body['_id'])

def process_request(mdf_s3_path: str, downloader: Downloader, uploader: Uploader, synchronizer: Synchronizer, processors: list[Processor]) -> dict[str, Any]:
def process_request(mdf_s3_path: str, downloader: Downloader, uploader: Uploader, synchronizer: Synchronizer, processors: list[Processor])->dict[str, Any]:
_logger.info('Starting processing of metadata for %s', mdf_s3_path)

# download and synchronize metadata
Expand All @@ -92,59 +116,38 @@ def process_request(mdf_s3_path: str, downloader: Downloader, uploader: Uploader
# we do not want the entire recording to fail for a specific processing only
_logger.exception('Error processing metadata.')
if(successful_processings == 0):
raise NoProcessingSuccessfulException(
'Not a single processing succeeded, therefore not updating metadata.')
raise NoProcessingSuccessfulException('Not a single processing succeeded, therefore not updating metadata.')

_logger.info('Successfully processed metadata for %s', mdf_s3_path)

# upload synchronized signals to s3 and store path in metadata
try:
metadata['signals_file'] = uploader.upload_signals(
synchronized, mdf_s3_path)
# added because Metadata expects this field to exist
metadata["s3_path"] = metadata['signals_file']['bucket'] + \
"/"+metadata['signals_file']["key"]
except Exception:
_logger.exception('Error uploading synchronized signals to S3.')
raise

return metadata


def extract_timestamps(filepath: str) -> tuple[int, int]:

def extract_timestamps(filepath: str)->tuple[int, int]:
match = re.search(r"_(\d{13,})_(\d{13,})_", filepath)
if not match or len(match.groups()) < 2:
raise InvalidFileNameException(
'Cannot extract timestamps from filepath "' + filepath + '".')
if not match or len(match.groups())<2:
raise InvalidFileNameException('Cannot extract timestamps from filepath "' + filepath + '".')
timestamp_from = int(match.group(1))
timestamp_to = int(match.group(2))
return timestamp_from, timestamp_to


class InvalidFileNameException(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)


class NoProcessingSuccessfulException(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)


class MdfParserConfig(TypedDict):
input_queue: str
metadata_output_queue: str
log_level: str


class InputMessage(TypedDict):
_id: str
s3_path: str


if __name__ == '__main__':
# Define configuration for logging messages
_logger = ContainerServices.configure_logging('mdfparser')
main()

## Generating dependencies for dependency injection
##
# External configuration that can be configured as kubernetes secret
config = MdfParserConfig.load_config_from_yaml_file(os.environ.get('CONFIG_FILE', '/app/config/config.yml'))

# Instanciating main loop and injecting dependencies
main(config=config)
else:
_logger = logging.getLogger('mdfparser')
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
"bucket": "bucket",
"key": "tenant_recording_1659962815000_1659962819000_signals.json"
}
}
}
55 changes: 23 additions & 32 deletions MDFParser/src/tests/integration/test_mdfparser.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
import json
import logging
import os
from unittest.mock import ANY, Mock, PropertyMock, patch

from pytest import LogCaptureFixture, fixture
from pytest_mock import MockerFixture
import mdfparser.main
from mdfparser.main import main, MdfParserConfig

__location__ = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__)))

recording_from = 1659962815000
recording_to = 1659962819000
recording_name = 'tenant_recording_' + \
str(recording_from) + '_' + str(recording_to)
recording_name = 'tenant_recording_' + str(recording_from) + '_' + str(recording_to)
s3_path = 's3://bucket/' + recording_name + '_metadata_full.json'


class TestMain:
@fixture
def boto3_mock(self, mocker: MockerFixture):
Expand All @@ -25,12 +22,9 @@ def boto3_mock(self, mocker: MockerFixture):

@fixture
def container_services_mock(self, mocker: MockerFixture, boto3_mock: Mock) -> Mock:
container_services_mock = mocker.patch(
'mdfparser.main.ContainerServices', autospec=True)
mocker.patch('mdfparser.downloader.ContainerServices',
container_services_mock)
mocker.patch('mdfparser.uploader.ContainerServices',
container_services_mock)
container_services_mock = mocker.patch('mdfparser.main.ContainerServices', autospec=True)
mocker.patch('mdfparser.downloader.ContainerServices', container_services_mock)
mocker.patch('mdfparser.uploader.ContainerServices', container_services_mock)
return container_services_mock

@fixture
Expand All @@ -39,8 +33,12 @@ def graceful_exit_mock(self, mocker: MockerFixture) -> Mock:
continue_running_mock = PropertyMock(side_effect=[True, False])
type(graceful_exit_mock.return_value).continue_running = continue_running_mock
return continue_running_mock

@fixture
def mdf_parser_config(self) -> MdfParserConfig:
return MdfParserConfig(input_queue='dev-terraform-queue-mdf-parser', metadata_output_queue='dev-terraform-queue-metadata')

def test_mdf_parsing(self, mocker: MockerFixture, container_services_mock: Mock, graceful_exit_mock: PropertyMock):
def test_mdf_parsing(self, mocker: MockerFixture, container_services_mock: Mock, graceful_exit_mock: PropertyMock, mdf_parser_config: MdfParserConfig):
### GIVEN ###
# data preparation
with open(os.path.join(__location__, 'test_data/mdf_synthetic.json'), 'r') as f:
Expand All @@ -60,24 +58,20 @@ def test_mdf_parsing(self, mocker: MockerFixture, container_services_mock: Mock,
container_services_mock.download_file.return_value = mdf_data_encoded

### WHEN ###
mdfparser.main.main()
main(config=mdf_parser_config)

### THEN ###
with open(os.path.join(__location__, 'test_data/recording_update_expected.json'), 'r') as f:
expected_update = json.loads(f.read())
with open(os.path.join(__location__, 'test_data/sync_expected.json'), 'r') as f:
expected_sync = json.dumps(json.loads(
f.read().encode('utf-8'))).encode('utf-8')

container_services_mock.return_value.send_message.assert_called_once_with(
ANY, 'dev-terraform-queue-metadata', expected_update)
container_services_mock.return_value.delete_message.assert_called_once_with(
ANY, message['ReceiptHandle'], 'dev-terraform-queue-mdf-parser')
container_services_mock.upload_file.assert_called_once_with(
ANY, expected_sync, 'bucket', recording_name + '_signals.json')
expected_sync = json.dumps(json.loads(f.read().encode('utf-8'))).encode('utf-8')

container_services_mock.return_value.send_message.assert_called_once_with(ANY, 'dev-terraform-queue-metadata', expected_update)
container_services_mock.return_value.delete_message.assert_called_once_with(ANY, message['ReceiptHandle'], 'dev-terraform-queue-mdf-parser')
container_services_mock.upload_file.assert_called_once_with(ANY, expected_sync, 'bucket', recording_name + '_signals.json')
assert(graceful_exit_mock.call_count == 2)

def test_mdf_parsing_invalid_path(self, mocker: MockerFixture, container_services_mock: Mock, graceful_exit_mock, caplog: LogCaptureFixture):
def test_mdf_parsing_invalid_path(self, mocker: MockerFixture, container_services_mock: Mock, graceful_exit_mock, caplog: LogCaptureFixture, mdf_parser_config: MdfParserConfig):
### GIVEN ###
message = {
'Body': json.dumps({
Expand All @@ -91,17 +85,16 @@ def test_mdf_parsing_invalid_path(self, mocker: MockerFixture, container_service
container_services_mock.return_value.listen_to_input_queue.return_value = message

### WHEN ###
mdfparser.main.main()
main(config=mdf_parser_config)

### THEN ###
assert(
'Invalid MDF path' in rec.message for rec in caplog.records if rec.levelname == 'ERROR')
assert('Invalid MDF path' in rec.message for rec in caplog.records if rec.levelname == 'ERROR')
container_services_mock.return_value.send_message.assert_not_called()
container_services_mock.return_value.delete_message.assert_not_called()
container_services_mock.upload_file.assert_not_called()
assert(graceful_exit_mock.call_count == 2)

def test_mdf_parsing_download_error(self, mocker: MockerFixture, container_services_mock: Mock, graceful_exit_mock, caplog: LogCaptureFixture):
def test_mdf_parsing_download_error(self, mocker: MockerFixture, container_services_mock: Mock, graceful_exit_mock, caplog: LogCaptureFixture, mdf_parser_config: MdfParserConfig):
### GIVEN ###
message = {
'Body': json.dumps({
Expand All @@ -114,15 +107,13 @@ def test_mdf_parsing_download_error(self, mocker: MockerFixture, container_servi
# input queue mocks
container_services_mock.return_value.listen_to_input_queue.return_value = message
# downloader mocks
container_services_mock.download_file.side_effect = Exception(
'Download error')
container_services_mock.download_file.side_effect = Exception('Download error')

### WHEN ###
mdfparser.main.main()
main(config=mdf_parser_config)

### THEN ###
assert(
'Download error' in rec.message for rec in caplog.records if rec.levelname == 'ERROR')
assert('Download error' in rec.message for rec in caplog.records if rec.levelname == 'ERROR')
container_services_mock.return_value.send_message.assert_not_called()
container_services_mock.return_value.delete_message.assert_not_called()
container_services_mock.upload_file.assert_not_called()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
input_queue: dev-terraform-queue-mdf-parser
metadata_output_queue: dev-terraform-queue-metadata
extra_metadata_output_queue: test
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
input_queue: dev-terraform-queue-mdf-parser
metadata_output_queue: dev-terraform-queue-metadata
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
input_queue: dev-terraform-queue-mdf-parser
4 changes: 2 additions & 2 deletions MDFParser/src/tests/test_chc_counter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import timedelta

from pytest import fixture
from chc_counter import ChcCounter
from mdfparser.chc_counter import ChcCounter

class TestChcCounter:
@fixture
Expand Down Expand Up @@ -111,4 +111,4 @@ def test_process_chc_not_present_at_all(self, chc_counter: ChcCounter, chc_not_p
# THEN
recording_overview = result['recording_overview']
assert(recording_overview['number_chc_events'] == 0)
assert(recording_overview['chc_duration'] == 0.0)
assert(recording_overview['chc_duration'] == 0.0)
Loading

0 comments on commit 8fa6f9f

Please sign in to comment.