Skip to content

Commit

Permalink
Updates to the job queuing and reading mechanism for evaluations to f…
Browse files Browse the repository at this point in the history
…low through a redis queue rather than a redis pubsub
  • Loading branch information
christophertubbs committed Jul 12, 2024
1 parent 987042c commit 0dd12ed
Show file tree
Hide file tree
Showing 6 changed files with 855 additions and 114 deletions.
2 changes: 0 additions & 2 deletions python/gui/MaaS/static/maas/js/ready_evaluation.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ async function submit_evaluation(event) {
editorData.view.setOption("readonly", true);
}

$(event.target).button("disable");

$("#tabs").tabs("option", "active", 1);
}

Expand Down
101 changes: 81 additions & 20 deletions python/gui/forwarding/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Defines a websocket consumer that does nothing other than pass messages directly to and from another websocket
to another service
"""
import json
import typing
import pathlib
import asyncio
Expand All @@ -21,15 +22,38 @@
LOGGER = logging.ConfiguredLogger()


ASGIView = typing.Callable[
[
typing.Mapping,
typing.Callable[[typing.Optional[bool], typing.Optional[float]], typing.Mapping],
typing.Callable[[typing.Mapping], None]
], typing.Coroutine[typing.Any, typing.Any, None]
]
"""
The signature for a function that yields an ASGI View
Args:
scope: HTTP Scope information
receive: A function to retrieve data from a queue
send: A function to send data through the socket
"""


class ForwardingSocket(SocketConsumer):
"""
A WebSocket Consumer that simply passes messages to and from another connection
"""
@classmethod
def asgi_from_configuration(
cls,
configuration: ForwardingConfiguration
) -> typing.Coroutine[typing.Any, typing.Any, None]:
def asgi_from_configuration(cls, configuration: ForwardingConfiguration) -> ASGIView:
"""
Create an asgi view with parameters provided by a ForwardingConfiguration
Args:
configuration: A configuration dictating where to forward socket messages
Returns:
A view function that requests may route to
"""
interface = cls.as_asgi(
target_host_name=configuration.name,
target_host_url=configuration.url,
Expand Down Expand Up @@ -101,10 +125,16 @@ def target_host_port(self) -> typing.Optional[typing.Union[str, int]]:

@property
def uses_ssl(self) -> bool:
"""
Whether the websocket connection is using SSL
"""
return self.__use_ssl

@property
def certificate_path(self) -> typing.Optional[str]:
"""
The path to an SSL certificate to use if SSL is to be employed
"""
return self._certificate_path

@property
Expand Down Expand Up @@ -139,31 +169,50 @@ def target_connection_url(self) -> str:

@property
def ssl_context(self) -> typing.Optional[ssl.SSLContext]:
if not self.__use_ssl:
return None

if not self._ssl_context:
"""
Get the SSL context to use if SSL is to be employed when connecting to a remote websocket
"""
if self.__use_ssl and not self._ssl_context:
self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
if not self._certificate_path:
raise ValueError(
f"An SSL certificate is required to connect to {self.__target_host_name} as configured, "
f"but none was given."
)
elif not os.path.exists(self._certificate_path):

if not os.path.exists(self._certificate_path):
raise ValueError(
f"The SSL Certificate needed to connect to {self.__target_host_name} was not "
f"found at {self._certificate_path}"
)
elif os.path.isfile(self._certificate_path):

if os.path.isfile(self._certificate_path):
self._ssl_context.load_verify_locations(cafile=self._certificate_path)
else:
self._ssl_context.load_verify_locations(capath=self._certificate_path)

return self._ssl_context

async def _connect_to_target(self):
"""
Connect to a remote websocket
"""
self.__connection = await connect_to_socket(uri=self.target_connection_url, ssl=self.ssl_context)

try:
# The connection was created, but throw an error if it can't be connected through
await self.__connection.ping()
except BaseException as ping_exception:
message = "Connection to remote server could not be established"
error_message = {
"event": "error",
"data": {
"message": message
}
}
await self.send(json.dumps(error_message))
raise Exception(message) from ping_exception

if self.__listen_task is None:
self.__listen_task = asyncio.create_task(self.listen(), name=f"ListenTo{self.__target_host_name}")

Expand All @@ -174,10 +223,22 @@ async def connect(self):
await super().accept()
await self._connect_to_target()

async def get_connection(self) -> WebSocketClientProtocol:
"""
Get a connection to the remote websocket
Returns:
A socket connection that facilitates sending and receiving messages
"""
if self.__connection is None or self.__connection.closed:
await self._connect_to_target()
return self.__connection

async def disconnect(self, code):
"""
Handler for when a client disconnects
"""
LOGGER.debug(f"Received signal for {self} to disconnect with code {code}")
# Attempt to cancel the task. This is mostly a safety measure. Cancelling here is preferred but a
# failure isn't the end of the world
if self.__listen_task is not None and not self.__listen_task.done():
Expand Down Expand Up @@ -233,14 +294,13 @@ async def listen(self):
"""
Listen for messages from the target connection and send them through the caller connection
"""
if self.__connection is None:
await self._connect_to_target()

async for message in self.__connection:
if isinstance(message, bytes):
await self.send(bytes_data=message)
else:
await self.send(message)
while True:
connection = await self.get_connection()
async for message in connection:
if isinstance(message, bytes):
await self.send(bytes_data=message)
else:
await self.send(message)

async def receive(self, text_data: str = None, bytes_data: bytes = None, **kwargs):
"""
Expand All @@ -253,10 +313,11 @@ async def receive(self, text_data: str = None, bytes_data: bytes = None, **kwarg
bytes_data: Bytes data sent over the socket
**kwargs:
"""
connection = await self.get_connection()
if bytes_data and not text_data:
await self.__connection.send(bytes_data)
await connection.send(bytes_data)
elif text_data:
await self.__connection.send(text_data)
await connection.send(text_data)
else:
LOGGER.warn("A message was received from the client but not text or bytes data was received.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,16 @@ def caller(self) -> str:


async def enqueue_job(launch_parameters: typing.Dict[str, typing.Any]):
"""
Queue up the job configuration for execution
Args:
launch_parameters: Details about how to run an evaluation
"""
with utilities.get_runner_connection() as runner_connection:
publication_response = runner_connection.publish(
publication_response = runner_connection.xadd(
EVALUATION_QUEUE_NAME,
json.dumps(launch_parameters)
launch_parameters
)

# `publish` returns an item that can be awaitable or just about anything else. In case the returned value is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,21 @@
EVALUATION_TEMPLATE_PATH = os.path.join(application_values.STATIC_RESOURCES_PATH, "evaluation_template.json")

class LaunchEvaluation(APIView):
"""
REST View used to post evaluation details and launch them out of process
"""
def post(self, request, *args, **kwargs):
"""
Handle a POST request
Args:
request: The raw request
*args: Positional arguments coming in through the routed URL
**kwargs: Keyword arguments coming in through the routed URL
Returns:
JSON data if this came from a standard request, a redirect if it came through a browser
"""
evaluation_id = request.POST.get("evaluation_id")
evaluation_id = evaluation_id.replace(" ", "_").replace(":", ".")
instructions = request.POST.get("instructions")
Expand All @@ -51,30 +65,50 @@ def post(self, request, *args, **kwargs):
"instructions": instructions
}
connection = utilities.get_redis_connection()
connection.publish("evaluation_jobs", json.dumps(launch_parameters))
connection.xadd(application_values.EVALUATION_QUEUE_NAME, launch_parameters)

return response


def get_evaluation_template() -> str:
"""
Get raw text to serve as a starting point for an evaluation
"""
with open(EVALUATION_TEMPLATE_PATH, "r") as evaluation_template_file:
return evaluation_template_file.read()


def generate_evaluation_id() -> str:
"""
Generate a unique evaluation id
"""
current_date = datetime.now()
date_representation = current_date.strftime("%m-%d_%H.%M")
evaluation_id = f"manual_evaluation_at_{date_representation}"
return evaluation_id


class ReadyListenEvaluation(View):
"""
View to show an editor used for editing, launching, and listening to an evaluation
"""
template = "evaluation_service/ready_evaluation.html"

def get_evaluation_template(self) -> str:
with open(EVALUATION_TEMPLATE_PATH, "r") as evaluation_template_file:
return evaluation_template_file.read()
def get(self, request: HttpRequest) -> HttpResponse:
"""
Handle a GET request
def _generate_evaluation_id(self, request: HttpRequest) -> str:
current_date = datetime.now()
date_representation = current_date.strftime("%m-%d_%H.%M")
evaluation_id = f"manual_evaluation_at_{date_representation}"
return evaluation_id
Args:
request: The request sent over HTTP
def get(self, request: HttpRequest) -> HttpResponse:
Returns:
A response containing HTML information needed to show the Evaluation Ready page
"""
context = {
"evaluation_template": self.get_evaluation_template(),
"evaluation_template": get_evaluation_template(),
"launch_url": "/ws/launch",
"metrics_url": "/evaluation_service/metrics",
"generated_evaluation_id": self._generate_evaluation_id(request),
"generated_evaluation_id": generate_evaluation_id(),
"evaluation_id_pattern": EVALUATION_ID_PATTERN,
"geometry_name": "",
"show_map": True,
Expand Down
Loading

0 comments on commit 0dd12ed

Please sign in to comment.