From 79698b42117b26b2380013ecb23942550134c104 Mon Sep 17 00:00:00 2001 From: Oleksii Olchedai <103605026+RenckelAlexOlche@users.noreply.github.com> Date: Fri, 18 Oct 2024 13:23:15 +0300 Subject: [PATCH] add `HTTPStrategy` and `AbstractStrategy` (#84) * add `HTTPStrategy` and `AbstractStrategy` * fix the code according to the review * move the constants to the 'constants.py' file * add RabbitMQStrategy, and move same logic to the Abstract class * Add _LOG to HTTPStrategy * Move signature methors to another place * rework http strategy to http connection * fix 'post_process_request' method * change version * Hide cryptography imports * debug and test * fix changelog * merge HTTPConfig, HTTPTransport into MaestroHTTPConfig, MaestroHTTPTransport * merge MaestroHTTPConfig into MaestroHTTPTransport * add typing * Add tests to build_* * Return old imports * Add minor fixes * Update changelog * Return old variables --------- Co-authored-by: Dmytro Afanasiev --- CHANGELOG.md | 5 +- modular_sdk/commons/__init__.py | 65 +++++ modular_sdk/commons/constants.py | 6 + modular_sdk/connections/rabbit_connection.py | 15 +- .../impl/maestro_http_transport_service.py | 164 ++++++++++++ .../impl/maestro_rabbit_transport_service.py | 236 ++++-------------- .../impl/maestro_signature_builder.py | 89 +++++++ .../services/rabbit_transport_service.py | 71 ++++-- modular_sdk/utils/runtime_tracer/generic.py | 4 +- pyproject.toml | 6 +- tests/test_maestro_signature_builder.py | 71 ++++++ tests/test_utils.py | 30 ++- 12 files changed, 538 insertions(+), 224 deletions(-) create mode 100644 modular_sdk/services/impl/maestro_http_transport_service.py create mode 100644 modular_sdk/services/impl/maestro_signature_builder.py create mode 100644 tests/test_maestro_signature_builder.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a009ff..ca4bed6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [6.2.0] - 2024-10-04 +- added `MaestroHTTPTransport` to allow to interact with maestro using https protocol + ## [6.1.1] - 2024-10-04 - added `MODULAR_SDK_HTTP_PROXY` and `MODULAR_SDK_HTTPS_PROXY` envs @@ -17,7 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Dump lib version to `pynamodb==5.3.2` and `dynamodb-json~=1.3` ## [6.0.1b1] - 2024-09-11 -- fix rabbitmq bug when a wrong message was consumed aster pushing +- fix rabbitmq bug when a wrong message was consumed after pushing ## [6.0.0] - 2024-08-20 - Split `SIEM_DEFECT_DOJO` Parent type into: diff --git a/modular_sdk/commons/__init__.py b/modular_sdk/commons/__init__.py index 5c6c048..7429f94 100644 --- a/modular_sdk/commons/__init__.py +++ b/modular_sdk/commons/__init__.py @@ -1,5 +1,8 @@ +import base64 import copy import dataclasses +import gzip +import json import warnings from functools import partial from uuid import uuid4 @@ -35,6 +38,7 @@ def deprecated_func(*args, **kwargs): return deprecated_decorator +# todo remove with major release @deprecated('not a part of the lib') def build_response(content, code=200): if code == RESPONSE_OK_CODE: @@ -91,6 +95,67 @@ def generate_id(): return str(uuid4()) +def generate_id_hex(): + return str(uuid4().hex) + + +def build_secure_message( + request_id: str, + command_name: str, + parameters_to_secure: dict, + secure_parameters: list[str] | None = None, + is_flat_request: bool = False, +) -> list[dict] | str: + if not secure_parameters: + secure_parameters = [] + secured_parameters = { + k: (v if k not in secure_parameters else '*****') + for k, v in parameters_to_secure.items() + } + return build_message( + request_id=request_id, + command_name=command_name, + parameters=secured_parameters, + is_flat_request=is_flat_request, + ) + + +def build_message( + request_id: str, + command_name: str, + parameters: list[dict] | dict, + is_flat_request: bool = False, + compressed: bool = False, +) -> list[dict] | str: + if isinstance(parameters, list): + result = [] + for payload in parameters: + result.extend( + build_payload(request_id, command_name, payload, is_flat_request) + ) + else: + result = \ + build_payload(request_id, command_name, parameters, is_flat_request) + if compressed: + return base64 \ + .b64encode(gzip.compress(json.dumps(result, separators=(',', ':')).encode('UTF-8'))).decode() + return result + + +def build_payload( + request_id: str, + command_name: str, + parameters: dict, + is_flat_request: bool, +) -> list[dict]: + if is_flat_request: + parameters.update({'type': command_name}) + result = [{'id': request_id, 'type': None, 'params': parameters}] + else: + result = [{'id': request_id, 'type': command_name,'params': parameters}] + return result + + def default_instance(value, _type: type, *args, **kwargs): return value if isinstance(value, _type) else _type(*args, **kwargs) diff --git a/modular_sdk/commons/constants.py b/modular_sdk/commons/constants.py index bad1d1d..43d0fec 100644 --- a/modular_sdk/commons/constants.py +++ b/modular_sdk/commons/constants.py @@ -319,3 +319,9 @@ def iter(cls): JOB_SUCCESS_STATE = 'SUCCESS' JOB_FAIL_STATE = 'FAIL' JOB_RUNNING_STATE = 'RUNNING' + +PLAIN_CONTENT_TYPE = 'text/plain' +SUCCESS_STATUS = 'SUCCESS' +ERROR_STATUS = 'FAILED' +RESULTS = 'results' +DATA = 'data' diff --git a/modular_sdk/connections/rabbit_connection.py b/modular_sdk/connections/rabbit_connection.py index c80e3ef..e024c7a 100644 --- a/modular_sdk/connections/rabbit_connection.py +++ b/modular_sdk/connections/rabbit_connection.py @@ -5,13 +5,14 @@ from modular_sdk.commons.log_helper import get_logger RABBIT_DEFAULT_RESPONSE_TIMEOUT = 30 -_LOG = get_logger('modular_sdk-rabbit_connection') +_LOG = get_logger(__name__) class RabbitMqConnection: - def __init__(self, connection_url: str, timeout: int): + def __init__(self, connection_url: str, + timeout: int = RABBIT_DEFAULT_RESPONSE_TIMEOUT): self.connection_url = connection_url - self.timeout = timeout or RABBIT_DEFAULT_RESPONSE_TIMEOUT + self.timeout = timeout self.responses = {} def _open_channel(self): @@ -26,7 +27,7 @@ def _open_channel(self): code=502, content=f'Connection to RabbitMQ refused: {error_msg}' ) - def _close(self): + def _close(self) -> None: try: if self.conn.is_open: self.conn.close() @@ -38,8 +39,8 @@ def publish( message: str, routing_key: str, exchange: str = '', - headers: dict = None, - content_type: str = None, + headers: dict | None = None, + content_type: str | None = None, ) -> None: _LOG.debug(f'Request queue: {routing_key}') channel = self._open_channel() @@ -81,7 +82,7 @@ def __basic_publish( def publish_sync( self, - message: str, + message: str | bytes, routing_key: str, correlation_id: str, callback_queue: str, diff --git a/modular_sdk/services/impl/maestro_http_transport_service.py b/modular_sdk/services/impl/maestro_http_transport_service.py new file mode 100644 index 0000000..e4d982e --- /dev/null +++ b/modular_sdk/services/impl/maestro_http_transport_service.py @@ -0,0 +1,164 @@ +import binascii +import json +import urllib.request +import threading +import urllib.parse +import urllib.error +from typing import Any +from modular_sdk.commons import ( + ModularException, generate_id, build_secure_message, build_message, +) +from modular_sdk.commons.constants import SUCCESS_STATUS +from modular_sdk.commons.log_helper import get_logger +from modular_sdk.services.impl.maestro_signature_builder import ( + MaestroSignatureBuilder, +) +from modular_sdk.services.rabbit_transport_service import AbstractTransport + +HTTP_DEFAULT_RESPONSE_TIMEOUT = 30 +_LOG = get_logger(__name__) + + +class MaestroHTTPTransport(AbstractTransport): + def __init__( + self, + sdk_access_key: str, + sdk_secret_key: str, + maestro_user: str, + api_link: str, + timeout: int = HTTP_DEFAULT_RESPONSE_TIMEOUT, + ): + self.access_key = sdk_access_key + self.secret_key = sdk_secret_key + self.user = maestro_user + self.api_link = api_link + self.timeout = timeout + + def pre_process_request(self, command_name: str, parameters: list[dict] | dict, + secure_parameters: list | None = None, + is_flat_request: bool = False, + async_request: bool = False, + compressed: bool = False, config=None + ) -> tuple[bytes, dict]: + request_id = generate_id() + _LOG.debug('Going to pre-process HTTP request') + message = build_message( + command_name=command_name, + parameters=parameters, + request_id=request_id, + is_flat_request=is_flat_request, + compressed=compressed, + ) + secure_message = message + # todo that is strange because why uncompressed data + # should lack parameters? + if not compressed: + secure_message = build_secure_message( + command_name=command_name, + parameters_to_secure=parameters, + secure_parameters=secure_parameters, + request_id=request_id, + is_flat_request=is_flat_request, + ) + _LOG.debug( + f'Prepared command: {command_name}\nCommand format: {secure_message}' + ) + signer = MaestroSignatureBuilder( + access_key=config.sdk_access_key if config and config.sdk_access_key else self.access_key, + secret_key=config.sdk_secret_key if config and config.sdk_secret_key else self.secret_key, + user=config.maestro_user if config and config.maestro_user else self.user, + ) + encrypted_body = signer.encrypt(data=message) + + _LOG.debug('Message encrypted') + # sign headers + headers = signer.get_signed_headers( + async_request=async_request, compressed=compressed, + ) + _LOG.debug('Signed headers prepared') + return encrypted_body, headers + + def post_process_request(self, response: bytes) -> tuple[int, str, Any]: + signer = MaestroSignatureBuilder( + access_key=self.access_key, + secret_key=self.secret_key, + user=self.user, + ) + try: + response_item = signer.decrypt(data=response) + _LOG.debug('Message from M3-server successfully decrypted') + except binascii.Error: + response_item = response.decode('utf-8') + try: + _LOG.debug(f'Raw decrypted message from server: {response_item}') + response_json = json.loads(response_item).get('results')[0] + except json.decoder.JSONDecodeError: + _LOG.error('Response cannot be decoded - invalid JSON string') + raise ModularException(code=502, content="Response can't be decoded") + status = response_json.get('status') + status_code = response_json.get('statusCode') + warnings = response_json.get('warnings') + if status == SUCCESS_STATUS: + data = response_json.get('data') + else: + data = response_json.get('error') or response_json.get('readableError') + try: + data = json.loads(data) + except json.decoder.JSONDecodeError: + data = data + response = {'status': status,'status_code': status_code} + if isinstance(data, str): + response.update({'message': data}) + if isinstance(data, dict): + data = [data] + if isinstance(data, list): + response.update({'items': data}) + if items := response_json.get('items'): + response.update({'items': items}) + if table_title := response_json.get('tableTitle'): + response.update({'table_title': table_title}) + if warnings: + response.update({'warnings': warnings}) + return status_code, status, response + + def send_sync(self, command_name: str, parameters: list[dict] | dict, + secure_parameters: list | None = None, + is_flat_request: bool = False, async_request: bool = False, + compressed: bool = False, config=None + ) -> tuple[int, str, Any]: + _LOG.debug('Making sync http request ') + message, headers = self.pre_process_request( + command_name=command_name, + parameters=parameters, + secure_parameters=secure_parameters, + is_flat_request=is_flat_request, + async_request=async_request, + compressed=compressed, + config=config + ) + req = urllib.request.Request( + url=self.api_link, headers=headers, data=message, method='POST', + ) + try: + with urllib.request.urlopen(req, timeout=self.timeout) as response: + _LOG.debug( + f'Response status code: {response.getcode()}, reason: {response.reason}') + return self.post_process_request(response.read()) + except urllib.error.HTTPError as e: + raise ModularException(code=e.getcode(), content=e.read().decode()) + except urllib.error.URLError as e: + _LOG.exception('Cannot make a request') + raise ModularException(code=502, + content='Could not make the request') + + def send_async(self, *args, **kwargs) -> None: + message, headers = self.pre_process_request(*args, **kwargs) + req = urllib.request.Request( + url=self.api_link, headers=headers, data=message, method='POST', + ) + + def _send(r, t): + with urllib.request.urlopen(r, timeout=t) as resp: + _LOG.info('Async request sent. No response will be processed') + + threading.Thread(target=_send, args=(req, self.timeout)).start() diff --git a/modular_sdk/services/impl/maestro_rabbit_transport_service.py b/modular_sdk/services/impl/maestro_rabbit_transport_service.py index 7b61470..5669f2a 100644 --- a/modular_sdk/services/impl/maestro_rabbit_transport_service.py +++ b/modular_sdk/services/impl/maestro_rabbit_transport_service.py @@ -1,30 +1,32 @@ -import base64 import binascii -import gzip -import hashlib -import hmac import json -import os -import uuid -from datetime import datetime - -from modular_sdk.commons import ModularException +from typing import Any +from modular_sdk.commons import ( + ModularException, generate_id, build_secure_message, build_message, +) +from modular_sdk.commons.constants import ( + PLAIN_CONTENT_TYPE, + SUCCESS_STATUS, + ERROR_STATUS, + RESULTS, + DATA +) # todo remove these imports with major release. They can be used from outside from modular_sdk.commons.log_helper import get_logger -from modular_sdk.services.rabbit_transport_service import RabbitMQTransport, \ - RabbitConfig - -_LOG = get_logger('rabbit_transport_service') +from modular_sdk.connections.rabbit_connection import RabbitMqConnection +from modular_sdk.services.impl.maestro_signature_builder import ( + MaestroSignatureBuilder, +) +from modular_sdk.services.rabbit_transport_service import ( + RabbitMQTransport, RabbitConfig, +) -PLAIN_CONTENT_TYPE = 'text/plain' -SUCCESS_STATUS = 'SUCCESS' -ERROR_STATUS = 'FAILED' -RESULTS = 'results' -DATA = 'data' +_LOG = get_logger(__name__) class MaestroRabbitConfig(RabbitConfig): - def __init__(self, request_queue, response_queue, rabbit_exchange, - sdk_access_key, sdk_secret_key, maestro_user): + def __init__(self, request_queue: str, response_queue: str, + rabbit_exchange: str, + sdk_access_key: str, sdk_secret_key: str, maestro_user: str): super(MaestroRabbitConfig, self).__init__( request_queue=request_queue, response_queue=response_queue, @@ -36,7 +38,8 @@ def __init__(self, request_queue, response_queue, rabbit_exchange, class MaestroRabbitMQTransport(RabbitMQTransport): - def __init__(self, rabbit_connection, config): + def __init__(self, rabbit_connection: RabbitMqConnection, + config: MaestroRabbitConfig): super().__init__( rabbit_connection=rabbit_connection, config=config @@ -47,204 +50,69 @@ def __init__(self, rabbit_connection, config): def pre_process_request(self, command_name, parameters, secure_parameters, is_flat_request, async_request, compressed=False, - config=None): - request_id = self._generate_id() - + config=None) -> tuple[str | bytes, dict]: + request_id = generate_id() _LOG.debug('Going to pre-process request') - message = self._build_message( + message = build_message( command_name=command_name, parameters=parameters, - id=request_id, + request_id=request_id, is_flat_request=is_flat_request, compressed=compressed ) secure_message = message if not compressed: - secure_message = self._build_secure_message( + secure_message = build_secure_message( command_name=command_name, parameters_to_secure=parameters, secure_parameters=secure_parameters, - id=request_id, + request_id=request_id, is_flat_request=is_flat_request ) _LOG.debug('Prepared command: {0}\nCommand format: {1}' .format(command_name, secure_message)) - encrypted_body = self._encrypt( + signer = MaestroSignatureBuilder( + access_key=config.sdk_access_key if config and config.sdk_access_key else self.access_key, secret_key=config.sdk_secret_key if config and config.sdk_secret_key else self.secret_key, - data=message + user=config.maestro_user if config and config.maestro_user else self.user, ) + encrypted_body = signer.encrypt(data=message) + _LOG.debug('Message encrypted') # sign headers - headers = self._get_signed_headers( - access_key=config.sdk_access_key if config and config.sdk_access_key else self.access_key, - secret_key=config.sdk_secret_key if config and config.sdk_secret_key else self.secret_key, - user=config.maestro_user if config and config.maestro_user else self.user, + headers = signer.get_signed_headers( async_request=async_request, compressed=compressed ) _LOG.debug('Signed headers prepared') return encrypted_body, headers - def post_process_request(self, response): + def post_process_request(self, response: bytes) -> tuple[int, str, Any]: + # TODO post process does not accept config whereas pre process accepts + signer = MaestroSignatureBuilder( + access_key=self.access_key, + secret_key=self.secret_key, + user=self.user + ) try: - response_item = self._decrypt( - secret_key=self.secret_key, - data=response - ) + response_item = signer.decrypt(data=response) _LOG.debug('Message from M3-server successfully decrypted') except binascii.Error: response_item = response.decode('utf-8') try: _LOG.debug(f'Raw decrypted message from server: {response_item}') response_json = json.loads(response_item).get('results')[0] - status = response_json.get('status') - code = response_json.get('statusCode') - if status == SUCCESS_STATUS: - data = response_json.get('data') - return code, status, data - else: - data = response_json.get('readableError') - return code, status, data - except json.decoder.JSONDecodeError: _LOG.error('Response can not be decoded - invalid Json string') raise ModularException( - code=502, - content='Response can not be decoded' + code=502, content='Response can not be decoded' ) - - @staticmethod - def _generate_id(): - return str(uuid.uuid4()) - - @staticmethod - def _decrypt(secret_key, data): - """ - Decode received message from Base64 format, cut initialization - vector ("iv") from beginning of the message, decrypt message - """ - from cryptography.hazmat.primitives.ciphers import Cipher, \ - algorithms, modes - decoded_data = base64.b64decode(data) - iv = decoded_data[:12] - encrypted_data = decoded_data[12:] - cipher = Cipher( - algorithms.AES(key=secret_key.encode('utf-8')), - modes.GCM(initialization_vector=iv) - ).decryptor() - origin_data_with_iv = cipher.update(encrypted_data) - # Due to Initialization vector in encrypting method - # there is need to split useful and useless parts of the - # server response. - response = origin_data_with_iv[:-16] - return response - - @staticmethod - def _encrypt(secret_key, data): - """ - Encrypt data, add initialization vector ("iv") at beginning of encrypted - message and encode entire data in Base64 format - """ - if not secret_key: - raise ModularException( - code=503, - content='Cannot detect secret_key. Please add it first' - ) - from cryptography.hazmat.primitives.ciphers.aead import AESGCM - iv = os.urandom(12) - plain_text = data if isinstance(data, str) else json.dumps(data) - data_in_bytes = plain_text.encode('utf-8') - try: - cipher = AESGCM(key=secret_key.encode('utf-8')) - except ValueError as e: - raise ValueError(str(e).replace('AESGCM key', 'Secret Key')) - encrypted_data = cipher.encrypt( - nonce=iv, data=data_in_bytes, associated_data=None) - encrypted_data_with_iv = bytes(iv) + encrypted_data - base64_request = base64.b64encode(encrypted_data_with_iv) - return base64_request - - @staticmethod - def _get_signed_headers(access_key: (str, int), secret_key: str, user: str, - async_request: bool = False, - compressed: bool = False) -> dict: - """ - Create and sign necessary headers for interaction with Maestro API - """ - if not access_key or not user: - raise ModularException( - code=503, - content='Cannot detect access_key or user. Please add it first' - ) - date = int(datetime.now().timestamp()) * 1000 - signature = hmac.new( - key=bytearray(f'{secret_key}{date}'.encode('utf-8')), - msg=bytearray(f'M3-POST:{access_key}:{date}:{user}'.encode('utf-8') - ), - digestmod=hashlib.sha256 - ).hexdigest() - n = 2 - resolved_signature = '' - for each in [signature[i:i + n] for i in range(0, len(signature), n)]: - resolved_signature += '1' + each - headers = { - "maestro-authentication": resolved_signature, - "maestro-request-identifier": "api-server", - "maestro-user-identifier": user, - "maestro-date": str(date), - "maestro-accesskey": str(access_key), - "maestro-sdk-version": "3.2.80", - "maestro-sdk-async": 'true' if async_request else 'false', - "compressed": True if compressed else False - } - return headers - - @staticmethod - def _build_payload(id, command_name, parameters, is_flat_request): - if is_flat_request: - parameters.update({'type': command_name}) - result = [ - { - 'id': id, - 'type': None, - 'params': parameters - } - ] - else: - result = [ - { - 'id': id, - 'type': command_name, - 'params': parameters - } - ] - return result - - def _build_message(self, id, command_name, parameters, - is_flat_request=False, compressed=False): - if isinstance(parameters, list): - result = [] - for payload in parameters: - result.extend(self._build_payload(id, command_name, payload, - is_flat_request)) + status = response_json.get('status') + code = response_json.get('statusCode') + if status == SUCCESS_STATUS: + data = response_json.get('data') + return code, status, data else: - result = self._build_payload(id, command_name, parameters, - is_flat_request) - if compressed: - return base64.b64encode(gzip.compress( - json.dumps(result).encode('UTF-8'))).decode() - return result - - def _build_secure_message(self, id, command_name, parameters_to_secure, - secure_parameters=None, is_flat_request=False): - if not secure_parameters: - secure_parameters = [] - secured_parameters = {k: (v if k not in secure_parameters else '*****') - for k, v in parameters_to_secure.items()} - return self._build_message( - command_name=command_name, - parameters=secured_parameters, - id=id, - is_flat_request=is_flat_request - ) + data = response_json.get('readableError') + return code, status, data diff --git a/modular_sdk/services/impl/maestro_signature_builder.py b/modular_sdk/services/impl/maestro_signature_builder.py new file mode 100644 index 0000000..266e329 --- /dev/null +++ b/modular_sdk/services/impl/maestro_signature_builder.py @@ -0,0 +1,89 @@ +import base64 +import hashlib +import hmac +import json +import os +import time +from modular_sdk.commons.constants import PLAIN_CONTENT_TYPE + + +class MaestroSignatureBuilder: + __slots__ = '_access_key', '_secret_key', '_user' + + def __init__(self, access_key: str, secret_key: str, user: str): + self._access_key = access_key + self._secret_key = secret_key + self._user = user + + def decrypt(self, data: bytes | str) -> bytes: + """ + Decode received message from Base64 format, cut initialization + vector ("iv") from beginning of the message, decrypt message + """ + from cryptography.hazmat.primitives.ciphers import ( + Cipher, algorithms, modes, + ) + decoded_data = base64.b64decode(data) + iv = decoded_data[:12] + encrypted_data = decoded_data[12:] + cipher = Cipher( + algorithms.AES(key=self._secret_key.encode('utf-8')), + modes.GCM(initialization_vector=iv) + ).decryptor() + origin_data_with_iv = cipher.update(encrypted_data) + # Due to Initialization vector in encrypting method + # there is need to split useful and useless parts of the + # server response. + response = origin_data_with_iv[:-16] + return response + + def encrypt(self, data: str | dict | list) -> bytes: + """ + Encrypt data, add initialization vector ("iv") at beginning of encrypted + message and encode entire data in Base64 format + """ + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + iv = os.urandom(12) + plain_text = data if isinstance(data, str) else json.dumps( + data, + separators=(',', ':') + ) + data_in_bytes = plain_text.encode('utf-8') + try: + cipher = AESGCM(key=self._secret_key.encode('utf-8')) + except ValueError as e: + raise ValueError(str(e).replace('AESGCM key', 'Secret Key')) + encrypted_data = cipher.encrypt( + nonce=iv, data=data_in_bytes, associated_data=None) + encrypted_data_with_iv = bytes(iv) + encrypted_data + return base64.b64encode(encrypted_data_with_iv) + + def get_signed_headers(self, async_request: bool = False, + compressed: bool = False) -> dict: + """ + Create and sign necessary headers for interaction with Maestro API + """ + date = int(time.time() * 1000) + signature = hmac.new( + key=bytearray(f'{self._secret_key}{date}'.encode('utf-8')), + msg=bytearray( + f'M3-POST:{self._access_key}:{date}:{self._user}'.encode( + 'utf-8') + ), + digestmod=hashlib.sha256 + ).hexdigest() + n = 2 + resolved_signature = '' + for each in [signature[i:i + n] for i in range(0, len(signature), n)]: + resolved_signature += '1' + each + return { + "maestro-authentication": resolved_signature, + "maestro-request-identifier": "api-server", + "maestro-user-identifier": self._user, + "maestro-date": str(date), + "maestro-accesskey": str(self._access_key), + "maestro-sdk-version": "3.2.80", + "maestro-sdk-async": 'true' if async_request else 'false', + "compressed": 'true' if compressed else 'false', + "Content-Type": PLAIN_CONTENT_TYPE, + } diff --git a/modular_sdk/services/rabbit_transport_service.py b/modular_sdk/services/rabbit_transport_service.py index 0d98769..7a5ac09 100644 --- a/modular_sdk/services/rabbit_transport_service.py +++ b/modular_sdk/services/rabbit_transport_service.py @@ -1,43 +1,62 @@ from abc import abstractmethod -import uuid -from modular_sdk.commons import ModularException -from modular_sdk.commons.log_helper import get_logger +from typing import TYPE_CHECKING, Any from pika import exceptions +from modular_sdk.commons import ModularException, generate_id_hex +from modular_sdk.commons.constants import ( + PLAIN_CONTENT_TYPE, + SUCCESS_STATUS, + ERROR_STATUS, + RESULTS, + DATA +) # todo remove these imports with major release. They can be used from outside +from modular_sdk.commons.log_helper import get_logger + +if TYPE_CHECKING: + from modular_sdk.connections.rabbit_connection import RabbitMqConnection + +_LOG = get_logger(__name__) + -_LOG = get_logger('RemoteExecutionService') +class AbstractTransport: + def send_sync(self, *args, **kwargs) -> tuple[int, str, Any]: + """ + Can raise ModularException + """ -PLAIN_CONTENT_TYPE = 'text/plain' -SUCCESS_STATUS = 'SUCCESS' -ERROR_STATUS = 'FAILED' -RESULTS = 'results' -DATA = 'data' + def send_async(self, *args, **kwargs) -> None: + pass class RabbitConfig: - def __init__(self, request_queue, response_queue, rabbit_exchange): + def __init__(self, request_queue: str, response_queue: str, + rabbit_exchange: str): self.request_queue = request_queue self.response_queue = response_queue self.rabbit_exchange = rabbit_exchange -class RabbitMQTransport: - def __init__(self, rabbit_connection, config): +class RabbitMQTransport(AbstractTransport): + def __init__(self, rabbit_connection: 'RabbitMqConnection', + config: RabbitConfig): self.rabbit = rabbit_connection self.request_queue = config.request_queue self.response_queue = config.response_queue self.exchange = config.rabbit_exchange @abstractmethod - def pre_process_request(self, *args, **kwargs): - # signing, encypt - pass + def pre_process_request(self, *args, **kwargs) -> tuple[str | bytes, dict]: + """ + Must return tuple that contains message and headers + """ @abstractmethod - def post_process_request(self, *args, **kwargs): - # sign check, decrypt - pass + def post_process_request(self, *args, **kwargs) -> tuple[int, str, str]: + """ + Must return a tuple that contains code status and response + """ - def __resolve_rabbit_options(self, exchange, request_queue, response_queue): + def __resolve_rabbit_options(self, exchange, request_queue, + response_queue) -> tuple[str, str, str]: exchange = exchange or self.exchange if exchange: routing_key = '' @@ -48,7 +67,7 @@ def __resolve_rabbit_options(self, exchange, request_queue, response_queue): response_queue = response_queue if response_queue else self.response_queue return routing_key, exchange, response_queue - def send_sync(self, *args, **kwargs): + def send_sync(self, *args, **kwargs) -> tuple[int, str, Any]: message, headers = self.pre_process_request(*args, **kwargs) rabbit_config = kwargs.get('config') request_queue, exchange, response_queue = \ @@ -58,17 +77,18 @@ def send_sync(self, *args, **kwargs): response_queue=rabbit_config.response_queue if rabbit_config else None ) - request_id = uuid.uuid4().hex + correlation_id = generate_id_hex() self.rabbit.publish_sync(routing_key=request_queue, exchange=exchange, callback_queue=response_queue, - correlation_id=request_id, + correlation_id=correlation_id, message=message, headers=headers, content_type=PLAIN_CONTENT_TYPE) try: - response_item = self.rabbit.consume_sync(queue=response_queue, - correlation_id=request_id) + response_item = self.rabbit.consume_sync( + queue=response_queue, correlation_id=correlation_id, + ) except exceptions.ConnectionWrongStateError as e: raise ModularException(code=502, content=str(e)) @@ -80,7 +100,7 @@ def send_sync(self, *args, **kwargs): ) return self.post_process_request(response=response_item) - def send_async(self, *args, **kwargs): + def send_async(self, *args, **kwargs) -> None: message, headers = self.pre_process_request(*args, **kwargs) rabbit_config = kwargs.get('config') request_queue, exchange, response_queue = \ @@ -96,4 +116,3 @@ def send_async(self, *args, **kwargs): message=message, headers=headers, content_type=PLAIN_CONTENT_TYPE) - diff --git a/modular_sdk/utils/runtime_tracer/generic.py b/modular_sdk/utils/runtime_tracer/generic.py index ac475cd..1d93806 100644 --- a/modular_sdk/utils/runtime_tracer/generic.py +++ b/modular_sdk/utils/runtime_tracer/generic.py @@ -1,6 +1,7 @@ import json import re from datetime import datetime +from modular_sdk.commons import generate_id_hex from modular_sdk.services.environment_service import EnvironmentService from modular_sdk.services.events_service import EventsService from modular_sdk.services.lambda_service import LambdaService @@ -43,8 +44,7 @@ def __init__(self, sqs_service: SQSService, self.processed_traces = [] def start(self): - import uuid - name = uuid.uuid4().hex + name = generate_id_hex() segment = Segment( name=name, tracer=self diff --git a/pyproject.toml b/pyproject.toml index de8bac8..e4a88bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "modular_sdk" -version = "6.1.1" +version = "6.2.0" authors = [ {name = "EPAM Systems", email = "support@syndicate.team"} ] @@ -54,9 +54,9 @@ include = ["modular_sdk/"] exclude = [ "**/__pycache__", ] -pythonVersion = "3.8" +pythonVersion = "3.10" reportIncompatibleMethodOverride = "warning" executionEnvironments = [ - {root = "tests/", pythonVersion = "3.8", extraPaths = ["modular_sdk/"]} + {root = "tests/", pythonVersion = "3.10", extraPaths = ["modular_sdk/"]} ] diff --git a/tests/test_maestro_signature_builder.py b/tests/test_maestro_signature_builder.py new file mode 100644 index 0000000..b01f484 --- /dev/null +++ b/tests/test_maestro_signature_builder.py @@ -0,0 +1,71 @@ +import json +import random +import secrets +from unittest.mock import patch + +import pytest + +from modular_sdk.services.impl.maestro_signature_builder import \ + MaestroSignatureBuilder + + +@pytest.fixture +def secret_key() -> str: + return secrets.token_hex(random.choice((8, 12, 16))) + + +def test_encrypt_decrypt(secret_key): + signer = MaestroSignatureBuilder( + access_key='access_key', + secret_key=secret_key, + user='user' + ) + data = 'my-secret-data' + assert signer.decrypt(signer.encrypt(data)) == data.encode() + + data = [1, 2, 3, 'my-data'] + assert json.loads(signer.decrypt(signer.encrypt(data))) == data + + data = {'key': 'value', 'list': [1, 2, {'key1': 'value1'}]} + assert json.loads(signer.decrypt(signer.encrypt(data))) == data + + +def test_encrypt(): + signer = MaestroSignatureBuilder( + access_key='access_key', + secret_key='1234567890123456', + user='user' + ) + with patch('os.urandom', lambda x: b'1' * 12): + val = signer.encrypt('secret-data') + assert val == b'MTExMTExMTExMTEx4pEMpmk6f+Ih4nJj3fK21M1TpP3VE/r/pggy' + + +def test_decrypt(): + signer = MaestroSignatureBuilder( + access_key='access_key', + secret_key='1234567890123456', + user='user' + ) + assert signer.decrypt( + b'kVUII6Yho1wGMkVQuKP8vFjt5iTwAoEWrrwqgSVx251IeXBcbP3AHQ==') == b'secret-data2' + + +def test_get_headers(): + signer = MaestroSignatureBuilder( + access_key='access_key', + secret_key='1234567890123456', + user='user' + ) + with patch('time.time', lambda: 1728655109.171027): + headers = signer.get_signed_headers( + async_request=True, + compressed=True + ) + assert headers == { + 'maestro-authentication': '19a1e711211517d16a1d015910a1141191a111111f12b16714714a1b41941111001c319b13e10114b1d412417b1f21e9', + 'maestro-request-identifier': 'api-server', + 'maestro-user-identifier': 'user', 'maestro-date': '1728655109171', + 'maestro-accesskey': 'access_key', 'maestro-sdk-version': '3.2.80', + 'maestro-sdk-async': 'true', 'compressed': True + } diff --git a/tests/test_utils.py b/tests/test_utils.py index b0cd40e..3e945f3 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,6 +1,10 @@ +import base64 +import json +import gzip + import pytest -from modular_sdk.commons import dict_without +from modular_sdk.commons import dict_without, build_payload, build_message, build_secure_message @pytest.fixture @@ -79,3 +83,27 @@ def test_dict_without(dict_sample): }, ] } + + +def test_build_payload(): + assert build_payload('id', 'name', {'key': 'value'}, False) == [ + {'id': 'id', 'type': 'name', 'params': {'key': 'value'}} + ] + assert build_payload('id', 'name', {'key': 'value'}, True) == [ + {'id': 'id', 'type': None, 'params': {'key': 'value', 'type': 'name'}} + ] + + +def test_build_message(): + assert build_message('id', 'name', [{'key1': 'value1'}, {'key2': 'value2'}]) == [{'id': 'id', 'type': 'name', 'params': {'key1': 'value1'}}, {'id': 'id', 'type': 'name', 'params': {'key2': 'value2'}}] + assert build_message('id', 'name', {'key1': 'value1'}) == [{'id': 'id', 'type': 'name', 'params': {'key1': 'value1'}}] + + assert build_message('id', 'name', {'key1': 'value1'}, True) == [{'id': 'id', 'type': None, 'params': {'key1': 'value1', 'type': 'name'}}] + + res = build_message('id', 'name', {'key1': 'value1'}, True, True) + assert json.loads(gzip.decompress(base64.b64decode(res))) == [{'id': 'id', 'type': None, 'params': {'key1': 'value1', 'type': 'name'}}] + + +def test_build_secure_message(): + # weird thing + assert build_secure_message('id', 'name', {'key': 'value', 'key1': 'value1'}, ['key'], True) == [{'id': 'id', 'type': None, 'params': {'key': '*****', 'key1': 'value1', 'type': 'name'}}] \ No newline at end of file