Skip to content

Commit

Permalink
Merge pull request #86 from epam/develop
Browse files Browse the repository at this point in the history
add `HTTPStrategy` and `AbstractStrategy` (#84)
  • Loading branch information
bohdan-onsha authored Oct 18, 2024
2 parents 2612d5c + 79698b4 commit 09ea174
Show file tree
Hide file tree
Showing 12 changed files with 538 additions and 224 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [6.2.0] - 2024-10-04
- added `MaestroHTTPTransport` to allow to interact with maestro using https protocol

## [6.1.1] - 2024-10-04
- added `MODULAR_SDK_HTTP_PROXY` and `MODULAR_SDK_HTTPS_PROXY` envs

Expand All @@ -17,7 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Dump lib version to `pynamodb==5.3.2` and `dynamodb-json~=1.3`

## [6.0.1b1] - 2024-09-11
- fix rabbitmq bug when a wrong message was consumed aster pushing
- fix rabbitmq bug when a wrong message was consumed after pushing

## [6.0.0] - 2024-08-20
- Split `SIEM_DEFECT_DOJO` Parent type into:
Expand Down
65 changes: 65 additions & 0 deletions modular_sdk/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import base64
import copy
import dataclasses
import gzip
import json
import warnings
from functools import partial
from uuid import uuid4
Expand Down Expand Up @@ -35,6 +38,7 @@ def deprecated_func(*args, **kwargs):
return deprecated_decorator


# todo remove with major release
@deprecated('not a part of the lib')
def build_response(content, code=200):
if code == RESPONSE_OK_CODE:
Expand Down Expand Up @@ -91,6 +95,67 @@ def generate_id():
return str(uuid4())


def generate_id_hex():
return str(uuid4().hex)


def build_secure_message(
request_id: str,
command_name: str,
parameters_to_secure: dict,
secure_parameters: list[str] | None = None,
is_flat_request: bool = False,
) -> list[dict] | str:
if not secure_parameters:
secure_parameters = []
secured_parameters = {
k: (v if k not in secure_parameters else '*****')
for k, v in parameters_to_secure.items()
}
return build_message(
request_id=request_id,
command_name=command_name,
parameters=secured_parameters,
is_flat_request=is_flat_request,
)


def build_message(
request_id: str,
command_name: str,
parameters: list[dict] | dict,
is_flat_request: bool = False,
compressed: bool = False,
) -> list[dict] | str:
if isinstance(parameters, list):
result = []
for payload in parameters:
result.extend(
build_payload(request_id, command_name, payload, is_flat_request)
)
else:
result = \
build_payload(request_id, command_name, parameters, is_flat_request)
if compressed:
return base64 \
.b64encode(gzip.compress(json.dumps(result, separators=(',', ':')).encode('UTF-8'))).decode()
return result


def build_payload(
request_id: str,
command_name: str,
parameters: dict,
is_flat_request: bool,
) -> list[dict]:
if is_flat_request:
parameters.update({'type': command_name})
result = [{'id': request_id, 'type': None, 'params': parameters}]
else:
result = [{'id': request_id, 'type': command_name,'params': parameters}]
return result


def default_instance(value, _type: type, *args, **kwargs):
return value if isinstance(value, _type) else _type(*args, **kwargs)

Expand Down
6 changes: 6 additions & 0 deletions modular_sdk/commons/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,9 @@ def iter(cls):
JOB_SUCCESS_STATE = 'SUCCESS'
JOB_FAIL_STATE = 'FAIL'
JOB_RUNNING_STATE = 'RUNNING'

PLAIN_CONTENT_TYPE = 'text/plain'
SUCCESS_STATUS = 'SUCCESS'
ERROR_STATUS = 'FAILED'
RESULTS = 'results'
DATA = 'data'
15 changes: 8 additions & 7 deletions modular_sdk/connections/rabbit_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from modular_sdk.commons.log_helper import get_logger

RABBIT_DEFAULT_RESPONSE_TIMEOUT = 30
_LOG = get_logger('modular_sdk-rabbit_connection')
_LOG = get_logger(__name__)


class RabbitMqConnection:
def __init__(self, connection_url: str, timeout: int):
def __init__(self, connection_url: str,
timeout: int = RABBIT_DEFAULT_RESPONSE_TIMEOUT):
self.connection_url = connection_url
self.timeout = timeout or RABBIT_DEFAULT_RESPONSE_TIMEOUT
self.timeout = timeout
self.responses = {}

def _open_channel(self):
Expand All @@ -26,7 +27,7 @@ def _open_channel(self):
code=502, content=f'Connection to RabbitMQ refused: {error_msg}'
)

def _close(self):
def _close(self) -> None:
try:
if self.conn.is_open:
self.conn.close()
Expand All @@ -38,8 +39,8 @@ def publish(
message: str,
routing_key: str,
exchange: str = '',
headers: dict = None,
content_type: str = None,
headers: dict | None = None,
content_type: str | None = None,
) -> None:
_LOG.debug(f'Request queue: {routing_key}')
channel = self._open_channel()
Expand Down Expand Up @@ -81,7 +82,7 @@ def __basic_publish(

def publish_sync(
self,
message: str,
message: str | bytes,
routing_key: str,
correlation_id: str,
callback_queue: str,
Expand Down
164 changes: 164 additions & 0 deletions modular_sdk/services/impl/maestro_http_transport_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import binascii
import json
import urllib.request
import threading
import urllib.parse
import urllib.error
from typing import Any
from modular_sdk.commons import (
ModularException, generate_id, build_secure_message, build_message,
)
from modular_sdk.commons.constants import SUCCESS_STATUS
from modular_sdk.commons.log_helper import get_logger
from modular_sdk.services.impl.maestro_signature_builder import (
MaestroSignatureBuilder,
)
from modular_sdk.services.rabbit_transport_service import AbstractTransport

HTTP_DEFAULT_RESPONSE_TIMEOUT = 30
_LOG = get_logger(__name__)


class MaestroHTTPTransport(AbstractTransport):
def __init__(
self,
sdk_access_key: str,
sdk_secret_key: str,
maestro_user: str,
api_link: str,
timeout: int = HTTP_DEFAULT_RESPONSE_TIMEOUT,
):
self.access_key = sdk_access_key
self.secret_key = sdk_secret_key
self.user = maestro_user
self.api_link = api_link
self.timeout = timeout

def pre_process_request(self, command_name: str, parameters: list[dict] | dict,
secure_parameters: list | None = None,
is_flat_request: bool = False,
async_request: bool = False,
compressed: bool = False, config=None
) -> tuple[bytes, dict]:
request_id = generate_id()
_LOG.debug('Going to pre-process HTTP request')
message = build_message(
command_name=command_name,
parameters=parameters,
request_id=request_id,
is_flat_request=is_flat_request,
compressed=compressed,
)
secure_message = message
# todo that is strange because why uncompressed data
# should lack parameters?
if not compressed:
secure_message = build_secure_message(
command_name=command_name,
parameters_to_secure=parameters,
secure_parameters=secure_parameters,
request_id=request_id,
is_flat_request=is_flat_request,
)
_LOG.debug(
f'Prepared command: {command_name}\nCommand format: {secure_message}'
)
signer = MaestroSignatureBuilder(
access_key=config.sdk_access_key if config and config.sdk_access_key else self.access_key,
secret_key=config.sdk_secret_key if config and config.sdk_secret_key else self.secret_key,
user=config.maestro_user if config and config.maestro_user else self.user,
)
encrypted_body = signer.encrypt(data=message)

_LOG.debug('Message encrypted')
# sign headers
headers = signer.get_signed_headers(
async_request=async_request, compressed=compressed,
)
_LOG.debug('Signed headers prepared')
return encrypted_body, headers

def post_process_request(self, response: bytes) -> tuple[int, str, Any]:
signer = MaestroSignatureBuilder(
access_key=self.access_key,
secret_key=self.secret_key,
user=self.user,
)
try:
response_item = signer.decrypt(data=response)
_LOG.debug('Message from M3-server successfully decrypted')
except binascii.Error:
response_item = response.decode('utf-8')
try:
_LOG.debug(f'Raw decrypted message from server: {response_item}')
response_json = json.loads(response_item).get('results')[0]
except json.decoder.JSONDecodeError:
_LOG.error('Response cannot be decoded - invalid JSON string')
raise ModularException(code=502, content="Response can't be decoded")
status = response_json.get('status')
status_code = response_json.get('statusCode')
warnings = response_json.get('warnings')
if status == SUCCESS_STATUS:
data = response_json.get('data')
else:
data = response_json.get('error') or response_json.get('readableError')
try:
data = json.loads(data)
except json.decoder.JSONDecodeError:
data = data
response = {'status': status,'status_code': status_code}
if isinstance(data, str):
response.update({'message': data})
if isinstance(data, dict):
data = [data]
if isinstance(data, list):
response.update({'items': data})
if items := response_json.get('items'):
response.update({'items': items})
if table_title := response_json.get('tableTitle'):
response.update({'table_title': table_title})
if warnings:
response.update({'warnings': warnings})
return status_code, status, response

def send_sync(self, command_name: str, parameters: list[dict] | dict,
secure_parameters: list | None = None,
is_flat_request: bool = False, async_request: bool = False,
compressed: bool = False, config=None
) -> tuple[int, str, Any]:
_LOG.debug('Making sync http request ')
message, headers = self.pre_process_request(
command_name=command_name,
parameters=parameters,
secure_parameters=secure_parameters,
is_flat_request=is_flat_request,
async_request=async_request,
compressed=compressed,
config=config
)
req = urllib.request.Request(
url=self.api_link, headers=headers, data=message, method='POST',
)
try:
with urllib.request.urlopen(req, timeout=self.timeout) as response:
_LOG.debug(
f'Response status code: {response.getcode()}, reason: {response.reason}')
return self.post_process_request(response.read())
except urllib.error.HTTPError as e:
raise ModularException(code=e.getcode(), content=e.read().decode())
except urllib.error.URLError as e:
_LOG.exception('Cannot make a request')
raise ModularException(code=502,
content='Could not make the request')

def send_async(self, *args, **kwargs) -> None:
message, headers = self.pre_process_request(*args, **kwargs)
req = urllib.request.Request(
url=self.api_link, headers=headers, data=message, method='POST',
)

def _send(r, t):
with urllib.request.urlopen(r, timeout=t) as resp:
_LOG.info('Async request sent. No response will be processed')

threading.Thread(target=_send, args=(req, self.timeout)).start()
Loading

0 comments on commit 09ea174

Please sign in to comment.