Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to IPConnection #26

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 62 additions & 22 deletions src/fastcs/connections/ip_connection.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,41 @@
import asyncio
import codecs
from dataclasses import dataclass
from typing import Any, Callable, Coroutine, cast

_AsyncFuncType = Callable[..., Coroutine[Any, Any, Any]]


def _with_lock(func: _AsyncFuncType) -> _AsyncFuncType:
async def with_lock(*args: Any, **kwargs: Any) -> None:
self = args[0]
async with self.lock:
await func(*args, **kwargs)

Check warning on line 13 in src/fastcs/connections/ip_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/ip_connection.py#L11-L13

Added lines #L11 - L13 were not covered by tests

return cast(_AsyncFuncType, with_lock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would prefer to have to do

with self.lock:
    ...

than this honestly. It is more conventional and it allows minimising the time the lock is held without having to split it out into a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old implementation wrapped all the contents of the functions in the context manager though. I can see why this would be nice for if only part of the function was in the CM, but I feel like the decorator approach is cleaner if that isn't the case.



def _ensure_connected(func: _AsyncFuncType) -> _AsyncFuncType:
"""
Decorator function to check if the wrapper is connected to the device
before calling the attached function.

Args:
func: Function to call if connected to device

Returns:
The wrapped function.

"""

async def check_connected(*args: Any, **kwargs: Any) -> None:
self = args[0]
if self._reader is None or self._writer is None:
raise DisconnectedError("Need to call connect() before using IPConnection.")

Check warning on line 34 in src/fastcs/connections/ip_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/ip_connection.py#L32-L34

Added lines #L32 - L34 were not covered by tests
else:
await func(*args, **kwargs)

Check warning on line 36 in src/fastcs/connections/ip_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/ip_connection.py#L36

Added line #L36 was not covered by tests

return cast(_AsyncFuncType, check_connected)
Copy link
Contributor

@GDYendell GDYendell Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I would prefer to make a wrapper around StreamWriter and StreamReader (StreamConnection?) where the connection is either open, or an exception is raised. This would simplify the typing and avoid having to do isinstance and is not None everywhere. So, IPConnection would make a StreamConnection, the connection is guaranteed to be open as soon as the object is instantiated and then it can call send and recv without having to check every time if it is valid.

I feel like this shouldn't be necessary. I expect am misunderstanding something here, but the asyncio API for this seems clunky.

Thoughts?

Copy link
Contributor Author

@OCopping OCopping Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did realise this probably does the same thing as SerialConnection does in #24, except that uses the socket module. I feel like that would be the better way to approach this.
It is worth noting that currently is written to be synchronous.



class DisconnectedError(Exception):
Expand All @@ -16,39 +52,43 @@
def __init__(self):
self._reader, self._writer = (None, None)
self._lock = asyncio.Lock()
self.connected: bool = False

Check warning on line 55 in src/fastcs/connections/ip_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/ip_connection.py#L55

Added line #L55 was not covered by tests

@property
def lock(self) -> asyncio.Lock:
return self._lock

Check warning on line 59 in src/fastcs/connections/ip_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/ip_connection.py#L59

Added line #L59 was not covered by tests
Copy link
Contributor

@GDYendell GDYendell Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed if _with_lock is removed. I am wary that this suggests that it can be generally used outside the class, and it should not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I didn't have this property, I only created it as the linter complained. The intention is that this would only be used in the decorator so wrap the function in the context manager, not use outside the class.


async def connect(self, settings: IPConnectionSettings):
self._reader, self._writer = await asyncio.open_connection(
settings.ip, settings.port
)

def ensure_connected(self):
if self._reader is None or self._writer is None:
raise DisconnectedError("Need to call connect() before using IPConnection.")

async def send_command(self, message) -> None:
async with self._lock:
self.ensure_connected()
await self._send_message(message)
@_with_lock
@_ensure_connected
async def send_command(self, message: str) -> None:
await self._send_message(message)

Check warning on line 69 in src/fastcs/connections/ip_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/ip_connection.py#L69

Added line #L69 was not covered by tests

async def send_query(self, message) -> str:
async with self._lock:
self.ensure_connected()
await self._send_message(message)
return await self._receive_response()
@_with_lock
@_ensure_connected
async def send_query(self, message: str) -> str:
await self._send_message(message)
return await self._receive_response()

Check warning on line 75 in src/fastcs/connections/ip_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/ip_connection.py#L74-L75

Added lines #L74 - L75 were not covered by tests

# TODO: Figure out type hinting for connections. TypeGuard fails to work as expected
@_with_lock
@_ensure_connected
async def close(self):
async with self._lock:
self.ensure_connected()
self._writer.close()
await self._writer.wait_closed()
self._reader, self._writer = (None, None)

async def _send_message(self, message) -> None:
self._writer.write(message.encode("utf-8"))
assert isinstance(self._writer, asyncio.StreamWriter)
self._writer.close()
await self._writer.wait_closed()
self._reader, self._writer = (None, None)

Check warning on line 84 in src/fastcs/connections/ip_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/ip_connection.py#L81-L84

Added lines #L81 - L84 were not covered by tests

async def _send_message(self, message: str) -> None:
assert isinstance(self._writer, asyncio.StreamWriter)
self._writer.write(codecs.encode(message, "utf-8"))

Check warning on line 88 in src/fastcs/connections/ip_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/ip_connection.py#L87-L88

Added lines #L87 - L88 were not covered by tests
await self._writer.drain()

async def _receive_response(self) -> str:
assert isinstance(self._reader, asyncio.StreamReader)

Check warning on line 92 in src/fastcs/connections/ip_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/ip_connection.py#L92

Added line #L92 was not covered by tests
data = await self._reader.readline()
return data.decode("utf-8")
return codecs.decode(data, "utf-8")

Check warning on line 94 in src/fastcs/connections/ip_connection.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/connections/ip_connection.py#L94

Added line #L94 was not covered by tests
Loading