From c81945f23531e920ebf99d5c0c8990ee77d9fefa Mon Sep 17 00:00:00 2001 From: edgar Date: Wed, 5 Oct 2022 19:49:15 +0200 Subject: [PATCH 01/29] update reader/writer acoording to public brain api --- .../{ci_python.DISABLEDyml => ci_python.yml} | 4 +- .vscode/settings.json | 9 +++ protos/farm_ng/core/event.proto | 5 ++ py/README.md | 46 ++++++++++++ py/farm_ng/core/events_file_reader.py | 73 +++++++++++++------ py/farm_ng/core/events_file_writer.py | 36 ++++++--- py/farm_ng/core/uri.py | 14 ++-- py/setup.cfg | 2 +- py/setup.py | 7 +- py/tests/tests/test_core.py | 16 ++-- 10 files changed, 159 insertions(+), 53 deletions(-) rename .github/workflows/{ci_python.DISABLEDyml => ci_python.yml} (89%) create mode 100644 .vscode/settings.json create mode 100644 py/README.md diff --git a/.github/workflows/ci_python.DISABLEDyml b/.github/workflows/ci_python.yml similarity index 89% rename from .github/workflows/ci_python.DISABLEDyml rename to .github/workflows/ci_python.yml index 7248878f..e7c9134d 100644 --- a/.github/workflows/ci_python.DISABLEDyml +++ b/.github/workflows/ci_python.yml @@ -22,8 +22,8 @@ jobs: with: python-version: ${{ matrix.python-version }} - name: Install dependencies - working-directory: python + working-directory: py run: pip3 install -e .[dev] - name: Run Tests - working-directory: python + working-directory: py run: pytest -v tests/ --mypy diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..74c179d8 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,9 @@ +{ + "python.testing.pytestArgs": [ + "py" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true, + "python.linting.enabled": true, + "python.linting.pylintEnabled": true +} diff --git a/protos/farm_ng/core/event.proto b/protos/farm_ng/core/event.proto index ed4a1817..64fbdd09 100644 --- a/protos/farm_ng/core/event.proto +++ b/protos/farm_ng/core/event.proto @@ -16,3 +16,8 @@ message Event { repeated Timestamp timestamps = 2; int64 payload_length = 3; } + +// TODO(edgar): remove after transition to new setup +message EventHeader { + string robot_name = 1; +} diff --git a/py/README.md b/py/README.md new file mode 100644 index 00000000..39869297 --- /dev/null +++ b/py/README.md @@ -0,0 +1,46 @@ +# farm-ng-core [python] + +## Install + +Install `pip3` & `virtualenv`: + +```bash +sudo apt-get install python3-pip +sudo pip3 install virtualenv +``` + +Clone the project: + +```bash +git clone https://github.com/farm-ng/farm-ng-core.git +``` + +Move to the python code directory: + +```bash +cd farm-ng-core/py +``` + +Start a virtual environment: + +```bash +# assuming you're already in the amiga-brain-api/ directory +python3 -m venv venv +source venv/bin/activate +``` + +Create and install the `farm_ng.core` package: + +```bash +# install to system +pip3 install . + +# or for development mode +pip3 install -e .[dev] +``` + +Verify that you have installed the package + +```bash +python3 -c "import farm_ng; print(farm_ng.__version__)" +``` diff --git a/py/farm_ng/core/events_file_reader.py b/py/farm_ng/core/events_file_reader.py index 606fddbc..f8362c06 100644 --- a/py/farm_ng/core/events_file_reader.py +++ b/py/farm_ng/core/events_file_reader.py @@ -2,14 +2,16 @@ import sys from collections import defaultdict from pathlib import Path -from typing import Any -from typing import BinaryIO -from typing import Dict +from typing import Any, Tuple +from typing import cast +from typing import DefaultDict +from typing import IO from typing import List from typing import Optional from farm_ng.core import event_pb2 from farm_ng.core.uri import Uri +from farm_ng.core import uri_pb2 class EventsFileReader: @@ -17,31 +19,42 @@ def __init__(self, file_name: Path) -> None: self._file_name = file_name.absolute() assert Path(self._file_name.parents[0]).is_dir() - self._file_stream: Optional[BinaryIO] = None + self._file_stream: Optional[IO] = None # assert self.open() # store the bytes offset in a dictionary where: # - the key is the string representation of the Uri of the message # - the value is a list of offsets to messages with that Uri - self.offsets: Optional[Dict[str, List[int]]] = None + self.offsets: DefaultDict[str, List[int]] = defaultdict(list) # self.compute_offsets() def __repr__(self) -> str: - return f"file_name: {str(self.file_name)}\nfile_stream: {self._file_stream}\nis_open: {self.is_open}" + return ( + f"file_name: {str(self.file_name)}\n" + f"file_stream: {self._file_stream}\nis_open: {self.is_open}" + ) def reset_offsets(self) -> None: self.offsets = defaultdict(list) def uris(self) -> List[Uri]: + if self.offsets is None: + return [] return [Uri.from_string(key) for key in self.offsets.keys()] def has_uri(self, uri) -> bool: + if self.offsets is None: + return False return uri.string() in self.offsets.keys() - # TODO: discuss the api signature - def compute_offsets(self) -> Dict[Uri, List[int]]: + def compute_offsets(self) -> None: if not self.is_open: raise Exception("Reader not open. Please, use reader.open()") + maybe_file_stream = self._file_stream + if maybe_file_stream is None: + return None + file_stream = cast(IO, maybe_file_stream) + # clear, if any the previous offsets self.reset_offsets() @@ -54,16 +67,17 @@ def compute_offsets(self) -> Dict[Uri, List[int]]: assert header is not None current_offset += 1 + header.ByteSize() + header_offset = current_offset while True: - event_len = int.from_bytes(self._file_stream.read(1), sys.byteorder) + event_len = int.from_bytes(file_stream.read(1), sys.byteorder) # end file condition if event_len == 0: - self._file_stream.seek(0) + file_stream.seek(header_offset) break - event: bytes = self._file_stream.read(event_len) + event: bytes = file_stream.read(event_len) event_proto = event_pb2.Event() event_proto.ParseFromString(event) @@ -77,7 +91,7 @@ def compute_offsets(self) -> Dict[Uri, List[int]]: current_offset += 1 + event_len + skip_bytes - self._file_stream.seek(current_offset) + file_stream.seek(current_offset) @property def file_name(self) -> Optional[Path]: @@ -95,6 +109,8 @@ def is_closed(self) -> bool: def open(self) -> bool: self._file_stream = open(self._file_name, "rb") + # can't always compute offsets? + # self.compute_offsets() return self.is_open() def close(self) -> bool: @@ -111,43 +127,56 @@ def num_frames(self, uri: Uri) -> int: def seek(self, uri: Uri, frame_id: int) -> None: assert uri.string() in self.offsets.keys() assert frame_id < self.num_frames(uri) - self._file_stream.seek(self.offsets[uri.string()][frame_id]) + maybe_file_stream = self._file_stream + if maybe_file_stream is None: + return None + file_stream = cast(IO, maybe_file_stream) + file_stream.seek(self.offsets[uri.string()][frame_id]) def seek_and_read(self, uri: Uri, frame_idx: int) -> Optional[Any]: self.seek(uri, frame_idx) return self.read() - def read(self) -> Optional[Any]: + def read(self) -> Optional[Tuple[Any, uri_pb2.Uri]]: # Read the message's length as bytes and convert it to an integer - event_len = int.from_bytes(self._file_stream.read(1), sys.byteorder) + maybe_file_stream = self._file_stream + if maybe_file_stream is None: + return None + file_stream = cast(IO, maybe_file_stream) + + event_len = int.from_bytes(file_stream.read(1), sys.byteorder) if event_len == 0: self.close() return None # Read that number of bytes as the message bytes - event: bytes = self._file_stream.read(event_len) + event: bytes = file_stream.read(event_len) event_proto = event_pb2.Event() event_proto.ParseFromString(event) # parse the message message_cls = getattr( - importlib.import_module(event_proto.module), event_proto.name + importlib.import_module(event_proto.uri.scheme), event_proto.uri.authority ) - message: bytes = self._file_stream.read(event_proto.length_next) + message: bytes = file_stream.read(event_proto.payload_length) message_out = message_cls() message_out.ParseFromString(message) - return message_out + return event_proto, message_out def header(self) -> event_pb2.EventHeader: - self._file_stream.seek(0) - header_len = int.from_bytes(self._file_stream.read(1), sys.byteorder) + maybe_file_stream = self._file_stream + if maybe_file_stream is None: + return event_pb2.EventHeader() + file_stream = cast(IO, maybe_file_stream) + file_stream.seek(0) + header_len = int.from_bytes(file_stream.read(1), sys.byteorder) # Read that number of bytes as the message bytes - header: bytes = self._file_stream.read(header_len) + header: bytes = file_stream.read(header_len) header_proto = event_pb2.EventHeader() header_proto.ParseFromString(header) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index e1f433b3..68b1c3ca 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -1,10 +1,12 @@ import sys from pathlib import Path from typing import Any -from typing import BinaryIO +from typing import cast +from typing import IO from typing import Optional from farm_ng.core import event_pb2 +from farm_ng.core import uri_pb2 class EventsFileWriter: @@ -12,10 +14,13 @@ def __init__(self, file_name: Path) -> None: self._file_name = file_name.absolute() assert Path(self._file_name.parents[0]).is_dir() - self._file_stream: Optional[BinaryIO] = None + self._file_stream: Optional[IO] = None def __repr__(self) -> str: - return f"file_name: {str(self.file_name)}\nfile_stream: {self._file_stream}\nis_open: {self.is_open}" + return ( + f"file_name: {str(self.file_name)}\n" + f"file_stream: {self._file_stream}\nis_open: {self.is_open}" + ) @property def file_name(self) -> Optional[Path]: @@ -36,24 +41,31 @@ def open(self) -> bool: return self.is_open() def close(self) -> bool: - self._file_stream.close() + maybe_file_stream = self._file_stream + if maybe_file_stream is None: + return False + file_stream = cast(IO, maybe_file_stream) + + file_stream.close() self._file_stream = None return self.is_closed() - def write(self, message: Any, uri: Optional[event_pb2.Uri] = None) -> None: + def write(self, message: Any, uri: Optional[uri_pb2.Uri] = None) -> None: + maybe_file_stream = self._file_stream + if maybe_file_stream is None: + return None + file_stream = cast(IO, maybe_file_stream) if uri is None: - uri = event_pb2.Uri() + uri = uri_pb2.Uri() event = event_pb2.Event( - module=str(message.__module__), - name=str(message.__class__.__name__), - length_next=message.ByteSize(), uri=uri, + payload_length=message.ByteSize(), ) event_len: bytes = event.ByteSize().to_bytes(1, sys.byteorder) - self._file_stream.write(event_len) - self._file_stream.write(event.SerializeToString()) - self._file_stream.write(message.SerializeToString()) + file_stream.write(event_len) + file_stream.write(event.SerializeToString()) + file_stream.write(message.SerializeToString()) diff --git a/py/farm_ng/core/uri.py b/py/farm_ng/core/uri.py index 189c087f..9c7f9d7b 100644 --- a/py/farm_ng/core/uri.py +++ b/py/farm_ng/core/uri.py @@ -1,8 +1,8 @@ -from farm_ng.core import event_pb2 +from farm_ng.core import uri_pb2 class Uri: - def __init__(self, uri: event_pb2.Uri) -> None: + def __init__(self, uri: uri_pb2.Uri) -> None: self._uri = uri def __eq__(self, other: "Uri") -> bool: @@ -13,17 +13,17 @@ def __repr__(self) -> str: return self.string() @property - def proto(self) -> event_pb2.Uri: + def proto(self) -> uri_pb2.Uri: return self._uri @property def scheme(self) -> str: - scheme_value = event_pb2.UriSchemeType.DESCRIPTOR.values[self._uri.scheme] + scheme_value = uri_pb2.UriSchemeType.DESCRIPTOR.values[self._uri.scheme] return scheme_value.name.lower() @classmethod def empty(cls) -> "Uri": - return Uri(event_pb2.Uri()) + return Uri(uri_pb2.Uri()) def string(self) -> str: return ( @@ -41,11 +41,11 @@ def from_string(self, string: str) -> "Uri": def from_strings( self, scheme_name: str, authority: str, path: str, query: str ) -> "Uri": - scheme_value = event_pb2.UriSchemeType.DESCRIPTOR.values_by_name[ + scheme_value = uri_pb2.UriSchemeType.DESCRIPTOR.values_by_name[ scheme_name.upper() ] return Uri( - event_pb2.Uri( + uri_pb2.Uri( scheme=scheme_value.number, authority=authority, path=path, query=query ) ) diff --git a/py/setup.cfg b/py/setup.cfg index 73ef8f30..c9f2a6df 100644 --- a/py/setup.cfg +++ b/py/setup.cfg @@ -25,7 +25,7 @@ setup_requires = grpcio-tools install_requires = - protobuf<4.0dev + protobuf grpcio tests_require = diff --git a/py/setup.py b/py/setup.py index e96a30bb..2cb43ee5 100644 --- a/py/setup.py +++ b/py/setup.py @@ -7,6 +7,8 @@ from setuptools.command.egg_info import egg_info from setuptools.command.install import install +from grpc_tools import command + class BuildProtosCommand(Command): user_options = [] @@ -18,13 +20,14 @@ def finalize_options(self): pass def run(self): - from grpc_tools import command proto_files_root = Path("../protos") command.build_package_protos(proto_files_root) for proto_file in proto_files_root.rglob("*_pb2*.py"): proto_file.rename(Path("./farm_ng/core") / proto_file.name) + for proto_file in proto_files_root.rglob("*_pb2*.pyi"): + proto_file.rename(Path("./farm_ng/core") / proto_file.name) class CleanFilesCommand(Command): @@ -40,6 +43,8 @@ def run(self): proto_files_root = Path("./farm_ng") for proto_file in proto_files_root.rglob("*_pb2*.py"): assert proto_file.unlink() is None + for proto_file in proto_files_root.rglob("*_pb2*.pyi"): + assert proto_file.unlink() is None class BuildProtosInstall(install): diff --git a/py/tests/tests/test_core.py b/py/tests/tests/test_core.py index 8c2149bd..ffe81324 100644 --- a/py/tests/tests/test_core.py +++ b/py/tests/tests/test_core.py @@ -1,19 +1,19 @@ from pathlib import Path import pytest -from farm_ng.core import event_pb2 +from farm_ng.core import uri_pb2 from farm_ng.core.events_file_reader import EventsFileReader from farm_ng.core.events_file_writer import EventsFileWriter -@pytest.fixture -def writer(tmpdir) -> EventsFileWriter: +@pytest.fixture(name="writer") +def fixture_writer(tmpdir) -> EventsFileWriter: file_name = Path(tmpdir) / "event.log" return EventsFileWriter(file_name) -@pytest.fixture -def reader(tmpdir) -> EventsFileReader: +@pytest.fixture(name="reader") +def fixture_reader(tmpdir) -> EventsFileReader: file_name = Path(tmpdir) / "event.log" return EventsFileReader(file_name) @@ -39,7 +39,7 @@ def test_open_close(self, writer: EventsFileWriter) -> None: def test_write_images(self, writer: EventsFileWriter) -> None: assert writer.open() - uri = event_pb2.Uri() + uri = uri_pb2.Uri() writer.write(uri) assert writer.close() @@ -73,11 +73,11 @@ def test_write_read( ) -> None: # write file assert writer.open() - uri = event_pb2.Uri() + uri = uri_pb2.Uri(scheme="farm_ng.core.uri_pb2", authority="Uri") writer.write(uri, uri) assert writer.close() # read back the data assert reader.open() - uri_out = reader.read() + _, uri_out = reader.read() assert reader.close() assert uri == uri_out From c68fc032450029e79751d2876d33f94d93e126ce Mon Sep 17 00:00:00 2001 From: edgar Date: Wed, 5 Oct 2022 19:50:47 +0200 Subject: [PATCH 02/29] update reader/writer acoording to public brain api --- py/setup.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/py/setup.py b/py/setup.py index 2cb43ee5..2fdfa42d 100644 --- a/py/setup.py +++ b/py/setup.py @@ -7,8 +7,6 @@ from setuptools.command.egg_info import egg_info from setuptools.command.install import install -from grpc_tools import command - class BuildProtosCommand(Command): user_options = [] @@ -20,6 +18,7 @@ def finalize_options(self): pass def run(self): + from grpc_tools import command proto_files_root = Path("../protos") command.build_package_protos(proto_files_root) From 3bef92757b587a79336fdec489edfb1b28c64b66 Mon Sep 17 00:00:00 2001 From: edgar Date: Wed, 5 Oct 2022 20:01:00 +0200 Subject: [PATCH 03/29] disable mypy --- .github/workflows/ci_python.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci_python.yml b/.github/workflows/ci_python.yml index e7c9134d..fe52f7ed 100644 --- a/.github/workflows/ci_python.yml +++ b/.github/workflows/ci_python.yml @@ -26,4 +26,6 @@ jobs: run: pip3 install -e .[dev] - name: Run Tests working-directory: py - run: pytest -v tests/ --mypy + # TODO: enable mypy later + # run: pytest -v tests/ --mypy + run: pytest -v tests/ From 47a01f583e95a4c047da9151325a01e03c2d388f Mon Sep 17 00:00:00 2001 From: edgar Date: Wed, 5 Oct 2022 20:23:38 +0200 Subject: [PATCH 04/29] update name to farm_ng_core --- py/setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/setup.cfg b/py/setup.cfg index c9f2a6df..2eaaa2c5 100644 --- a/py/setup.cfg +++ b/py/setup.cfg @@ -1,5 +1,5 @@ [metadata] -name = farm_ng +name = farm_ng_core version = 0.0.1-dev description = long_description = file: README.md From 6d1c26fe2a5acbdc1c8d4981977bd94bb02d2b73 Mon Sep 17 00:00:00 2001 From: Edgar Riba Date: Wed, 5 Oct 2022 23:45:26 +0200 Subject: [PATCH 05/29] Undo renaming --- py/setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/setup.cfg b/py/setup.cfg index 2eaaa2c5..c9f2a6df 100644 --- a/py/setup.cfg +++ b/py/setup.cfg @@ -1,5 +1,5 @@ [metadata] -name = farm_ng_core +name = farm_ng version = 0.0.1-dev description = long_description = file: README.md From 7bfe49492cfc7fc78333fe5e93d279c61a484419 Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 6 Oct 2022 11:57:04 +0200 Subject: [PATCH 06/29] import only core --- py/farm_ng/__init__.py | 2 -- py/setup.cfg | 1 - 2 files changed, 3 deletions(-) diff --git a/py/farm_ng/__init__.py b/py/farm_ng/__init__.py index 6faa2c18..471eecce 100644 --- a/py/farm_ng/__init__.py +++ b/py/farm_ng/__init__.py @@ -1,5 +1,3 @@ -from farm_ng import core - # Version variable import sys diff --git a/py/setup.cfg b/py/setup.cfg index c9f2a6df..45cdefb0 100644 --- a/py/setup.cfg +++ b/py/setup.cfg @@ -36,7 +36,6 @@ tests_require = test_suite = tests packages = - farm_ng farm_ng.core [options.extras_require] From 0b70815db6374297324e9eca48860b60286765c7 Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 6 Oct 2022 14:45:39 +0200 Subject: [PATCH 07/29] rename to farm_ng_core --- py/setup.cfg | 8 +++++--- py/setup.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/py/setup.cfg b/py/setup.cfg index 45cdefb0..6bdc29a8 100644 --- a/py/setup.cfg +++ b/py/setup.cfg @@ -1,12 +1,12 @@ [metadata] -name = farm_ng +name = farm_ng_core version = 0.0.1-dev description = long_description = file: README.md author = Farm-ng Inc. author_email = info@farm-ng.com -url = https://github.com/farm-ng/amiga-dev-kit -download_url = https://github.com/farm-ng/amiga-dev-kit +url = https://github.com/farm-ng/farm-ng-core +download_url = https://github.com/farm-ng/farm-ng-core keywords = robotics, open-source license_files = LICENSE classifiers = @@ -37,6 +37,8 @@ test_suite = tests packages = farm_ng.core +package_dir = + farm_ng.core = farm_ng/core [options.extras_require] dev = diff --git a/py/setup.py b/py/setup.py index 2fdfa42d..fa9af91f 100644 --- a/py/setup.py +++ b/py/setup.py @@ -79,5 +79,5 @@ def run(self): "develop": BuildProtosDevelop, "egg_info": BuildProtosEggInfo, "clean": CleanFilesCommand, - } + }, ) From 35781864ccbd1aea207be15f244e1f2106b6875f Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 6 Oct 2022 15:00:58 +0200 Subject: [PATCH 08/29] undo package_dir --- py/setup.cfg | 2 -- 1 file changed, 2 deletions(-) diff --git a/py/setup.cfg b/py/setup.cfg index 6bdc29a8..19979967 100644 --- a/py/setup.cfg +++ b/py/setup.cfg @@ -37,8 +37,6 @@ test_suite = tests packages = farm_ng.core -package_dir = - farm_ng.core = farm_ng/core [options.extras_require] dev = From 90dd8bd482391dd002ac89ddea4c9e675a4331da Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 6 Oct 2022 15:32:19 +0200 Subject: [PATCH 09/29] remove init --- py/farm_ng/__init__.py | 9 --------- 1 file changed, 9 deletions(-) delete mode 100644 py/farm_ng/__init__.py diff --git a/py/farm_ng/__init__.py b/py/farm_ng/__init__.py deleted file mode 100644 index 471eecce..00000000 --- a/py/farm_ng/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -# Version variable -import sys - -if sys.version_info >= (3, 8): # pragma: >=3.8 cover - import importlib.metadata as importlib_metadata -else: # pragma: <3.8 cover - import importlib_metadata - -__version__ = importlib_metadata.version("farm_ng") From 9671abd3380662b18e23526fbfa4153b74ce6668 Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 6 Oct 2022 17:06:59 +0200 Subject: [PATCH 10/29] set version --- py/farm_ng/core/__init__.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/py/farm_ng/core/__init__.py b/py/farm_ng/core/__init__.py index e69de29b..471eecce 100644 --- a/py/farm_ng/core/__init__.py +++ b/py/farm_ng/core/__init__.py @@ -0,0 +1,9 @@ +# Version variable +import sys + +if sys.version_info >= (3, 8): # pragma: >=3.8 cover + import importlib.metadata as importlib_metadata +else: # pragma: <3.8 cover + import importlib_metadata + +__version__ = importlib_metadata.version("farm_ng") From 427eb372bdd89f122d79fc33d56e1054b48f9ac2 Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 6 Oct 2022 18:22:07 +0200 Subject: [PATCH 11/29] fix proto linkage --- py/setup.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/py/setup.py b/py/setup.py index fa9af91f..384493f8 100644 --- a/py/setup.py +++ b/py/setup.py @@ -24,9 +24,17 @@ def run(self): command.build_package_protos(proto_files_root) for proto_file in proto_files_root.rglob("*_pb2*.py"): - proto_file.rename(Path("./farm_ng/core") / proto_file.name) + proto_file_new = Path(*proto_file.parts[2:]) + if not proto_file_new.exists(): + proto_file.rename(proto_file_new) + if proto_file.exists(): + proto_file.unlink() for proto_file in proto_files_root.rglob("*_pb2*.pyi"): - proto_file.rename(Path("./farm_ng/core") / proto_file.name) + proto_file_new = Path(*proto_file.parts[2:]) + if not proto_file_new.exists(): + proto_file.rename(proto_file_new) + if proto_file.exists(): + proto_file.unlink() class CleanFilesCommand(Command): From 0cbca145728fda5af3d5a892597569f4b87a7a14 Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 6 Oct 2022 18:29:22 +0200 Subject: [PATCH 12/29] set version for farm_ng_core --- py/farm_ng/core/__init__.py | 2 +- py/tests/tests/test_package.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/py/farm_ng/core/__init__.py b/py/farm_ng/core/__init__.py index 471eecce..b609465d 100644 --- a/py/farm_ng/core/__init__.py +++ b/py/farm_ng/core/__init__.py @@ -6,4 +6,4 @@ else: # pragma: <3.8 cover import importlib_metadata -__version__ = importlib_metadata.version("farm_ng") +__version__ = importlib_metadata.version("farm_ng_core") diff --git a/py/tests/tests/test_package.py b/py/tests/tests/test_package.py index c4c21ce5..166a03a9 100644 --- a/py/tests/tests/test_package.py +++ b/py/tests/tests/test_package.py @@ -2,5 +2,5 @@ def test_import() -> None: - assert farm_ng.__version__ is not None assert farm_ng.core is not None + assert farm_ng.core.__version__ is not None From 8fd7766c539a44f90986e636f539b9b86b69614e Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 6 Oct 2022 18:33:24 +0200 Subject: [PATCH 13/29] fix readme --- py/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/README.md b/py/README.md index 39869297..6d1fb0f2 100644 --- a/py/README.md +++ b/py/README.md @@ -24,7 +24,7 @@ cd farm-ng-core/py Start a virtual environment: ```bash -# assuming you're already in the amiga-brain-api/ directory +# assuming you're already in farm-ng-core/py directory python3 -m venv venv source venv/bin/activate ``` From 04a6ebed1f4a5b77bcb19b8b0ff6f2137cb2ad98 Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 6 Oct 2022 19:04:21 +0200 Subject: [PATCH 14/29] sync reader from workspace --- protos/farm_ng/core/event.proto | 5 -- py/farm_ng/core/events_file_reader.py | 92 ++++++++++----------------- py/farm_ng/core/uri.py | 13 +--- 3 files changed, 36 insertions(+), 74 deletions(-) diff --git a/protos/farm_ng/core/event.proto b/protos/farm_ng/core/event.proto index 64fbdd09..ed4a1817 100644 --- a/protos/farm_ng/core/event.proto +++ b/protos/farm_ng/core/event.proto @@ -16,8 +16,3 @@ message Event { repeated Timestamp timestamps = 2; int64 payload_length = 3; } - -// TODO(edgar): remove after transition to new setup -message EventHeader { - string robot_name = 1; -} diff --git a/py/farm_ng/core/events_file_reader.py b/py/farm_ng/core/events_file_reader.py index f8362c06..f69fdc8e 100644 --- a/py/farm_ng/core/events_file_reader.py +++ b/py/farm_ng/core/events_file_reader.py @@ -8,6 +8,7 @@ from typing import IO from typing import List from typing import Optional +import logging from farm_ng.core import event_pb2 from farm_ng.core.uri import Uri @@ -16,6 +17,8 @@ class EventsFileReader: def __init__(self, file_name: Path) -> None: + self.logger = logging.getLogger(self.__class__.__name__) + self._file_name = file_name.absolute() assert Path(self._file_name.parents[0]).is_dir() @@ -25,7 +28,6 @@ def __init__(self, file_name: Path) -> None: # - the key is the string representation of the Uri of the message # - the value is a list of offsets to messages with that Uri self.offsets: DefaultDict[str, List[int]] = defaultdict(list) - # self.compute_offsets() def __repr__(self) -> str: return ( @@ -46,52 +48,39 @@ def has_uri(self, uri) -> bool: return False return uri.string() in self.offsets.keys() - def compute_offsets(self) -> None: - if not self.is_open: - raise Exception("Reader not open. Please, use reader.open()") - - maybe_file_stream = self._file_stream - if maybe_file_stream is None: - return None - file_stream = cast(IO, maybe_file_stream) + def _read_next_event(self) -> Optional[event_pb2.Event]: + event_len = int.from_bytes(self._file_stream.read(1), sys.byteorder) + # end file condition + if event_len == 0: + self._file_stream.seek(0) + return - # clear, if any the previous offsets - self.reset_offsets() + event_bytes: bytes = self._file_stream.read(event_len) - current_offset: int = 0 + event = event_pb2.Event() + event.ParseFromString(event_bytes) + return event - skip_bytes: int = 0 + def _skip_next_message(self, event: event_pb2.Event) -> None: + msg_bytes: int = event.payload_length + self._file_stream.seek(msg_bytes, 1) - # reade first the header - header = self.header() - assert header is not None + def _compute_offsets(self) -> None: + if not self.is_open: + raise Exception("Reader not open. Please, use reader.open()") - current_offset += 1 + header.ByteSize() - header_offset = current_offset + # clear, if any the previous offsets + self.reset_offsets() while True: - - event_len = int.from_bytes(file_stream.read(1), sys.byteorder) - # end file condition - if event_len == 0: - file_stream.seek(header_offset) - break - - event: bytes = file_stream.read(event_len) - - event_proto = event_pb2.Event() - event_proto.ParseFromString(event) - - uri = Uri(event_proto.uri) + current_offset = self._file_stream.tell() + maybe_event = self._read_next_event() + if maybe_event is None: + return + event = cast(event_pb2.Event, maybe_event) + uri = Uri(event.uri) self.offsets[uri.string()].append(current_offset) - - # skip the main message decoding - # message: bytes = self._file_stream.read(event_proto.length_next) - skip_bytes = event_proto.length_next - - current_offset += 1 + event_len + skip_bytes - - file_stream.seek(current_offset) + self._skip_next_message(event) @property def file_name(self) -> Optional[Path]: @@ -109,8 +98,7 @@ def is_closed(self) -> bool: def open(self) -> bool: self._file_stream = open(self._file_name, "rb") - # can't always compute offsets? - # self.compute_offsets() + self._compute_offsets() return self.is_open() def close(self) -> bool: @@ -120,6 +108,8 @@ def close(self) -> bool: return self.is_closed() def num_frames(self, uri: Uri) -> int: + if not self.offsets: + self._compute_offsets() if uri.string() not in self.offsets.keys(): return -1 return len(self.offsets[uri.string()]) @@ -129,7 +119,7 @@ def seek(self, uri: Uri, frame_id: int) -> None: assert frame_id < self.num_frames(uri) maybe_file_stream = self._file_stream if maybe_file_stream is None: - return None + return file_stream = cast(IO, maybe_file_stream) file_stream.seek(self.offsets[uri.string()][frame_id]) @@ -147,7 +137,7 @@ def read(self) -> Optional[Tuple[Any, uri_pb2.Uri]]: event_len = int.from_bytes(file_stream.read(1), sys.byteorder) if event_len == 0: self.close() - return None + return # Read that number of bytes as the message bytes event: bytes = file_stream.read(event_len) @@ -166,19 +156,3 @@ def read(self) -> Optional[Tuple[Any, uri_pb2.Uri]]: message_out.ParseFromString(message) return event_proto, message_out - - def header(self) -> event_pb2.EventHeader: - maybe_file_stream = self._file_stream - if maybe_file_stream is None: - return event_pb2.EventHeader() - file_stream = cast(IO, maybe_file_stream) - file_stream.seek(0) - header_len = int.from_bytes(file_stream.read(1), sys.byteorder) - - # Read that number of bytes as the message bytes - header: bytes = file_stream.read(header_len) - - header_proto = event_pb2.EventHeader() - header_proto.ParseFromString(header) - - return header_proto diff --git a/py/farm_ng/core/uri.py b/py/farm_ng/core/uri.py index 9c7f9d7b..360b4bbc 100644 --- a/py/farm_ng/core/uri.py +++ b/py/farm_ng/core/uri.py @@ -16,22 +16,15 @@ def __repr__(self) -> str: def proto(self) -> uri_pb2.Uri: return self._uri - @property - def scheme(self) -> str: - scheme_value = uri_pb2.UriSchemeType.DESCRIPTOR.values[self._uri.scheme] - return scheme_value.name.lower() - @classmethod def empty(cls) -> "Uri": return Uri(uri_pb2.Uri()) def string(self) -> str: - return ( - f"{self.scheme}://{self._uri.authority}/{self._uri.path}?{self._uri.query}" - ) + return f"{self.proto.scheme}://{self.proto.authority}/{self.proto.path}?{self.proto.query}" @classmethod - def from_string(self, string: str) -> "Uri": + def from_string(cls, string: str) -> "Uri": scheme_name, remainder = string.split("://") remainder, query = remainder.split("?") authority, path = remainder.split("/") @@ -39,7 +32,7 @@ def from_string(self, string: str) -> "Uri": @classmethod def from_strings( - self, scheme_name: str, authority: str, path: str, query: str + cls, scheme_name: str, authority: str, path: str, query: str ) -> "Uri": scheme_value = uri_pb2.UriSchemeType.DESCRIPTOR.values_by_name[ scheme_name.upper() From 9fff4d4a4c3fe196a8cfccea738e35fc56c59cb4 Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 6 Oct 2022 21:28:59 +0200 Subject: [PATCH 15/29] parse protobuf descriptor --- py/farm_ng/core/events_file_reader.py | 32 ++++++++++++++++----------- py/farm_ng/core/events_file_writer.py | 11 ++++----- py/farm_ng/core/uri.py | 15 ++----------- py/tests/tests/test_core.py | 17 ++++++++++---- 4 files changed, 40 insertions(+), 35 deletions(-) diff --git a/py/farm_ng/core/events_file_reader.py b/py/farm_ng/core/events_file_reader.py index f69fdc8e..3d10ca7c 100644 --- a/py/farm_ng/core/events_file_reader.py +++ b/py/farm_ng/core/events_file_reader.py @@ -1,4 +1,5 @@ import importlib +from importlib.resources import Package import sys from collections import defaultdict from pathlib import Path @@ -15,6 +16,16 @@ from farm_ng.core import uri_pb2 +def parse_protobuf_descriptor(desc) -> Tuple[str, str]: + # parse a proto descriptor and extract the message name and package. + # See: https://developers.google.com/protocol-buffers/docs/reference/python-generated#invocation + # NOTE: the descriptor comes in the shape of `farm_ng.core.proto.Timestamp` + desc_list = desc.split(".") + name = desc_list[-1] + package = desc_list[:-2] + [name.lower() + "_pb2"] + return name, ".".join(package) + + class EventsFileReader: def __init__(self, file_name: Path) -> None: self.logger = logging.getLogger(self.__class__.__name__) @@ -49,11 +60,11 @@ def has_uri(self, uri) -> bool: return uri.string() in self.offsets.keys() def _read_next_event(self) -> Optional[event_pb2.Event]: - event_len = int.from_bytes(self._file_stream.read(1), sys.byteorder) + event_len = int.from_bytes(self._file_stream.read(4), sys.byteorder) # end file condition if event_len == 0: self._file_stream.seek(0) - return + return None event_bytes: bytes = self._file_stream.read(event_len) @@ -63,7 +74,7 @@ def _read_next_event(self) -> Optional[event_pb2.Event]: def _skip_next_message(self, event: event_pb2.Event) -> None: msg_bytes: int = event.payload_length - self._file_stream.seek(msg_bytes, 1) + self._file_stream.seek(msg_bytes, 4) def _compute_offsets(self) -> None: if not self.is_open: @@ -123,21 +134,17 @@ def seek(self, uri: Uri, frame_id: int) -> None: file_stream = cast(IO, maybe_file_stream) file_stream.seek(self.offsets[uri.string()][frame_id]) - def seek_and_read(self, uri: Uri, frame_idx: int) -> Optional[Any]: - self.seek(uri, frame_idx) - return self.read() - - def read(self) -> Optional[Tuple[Any, uri_pb2.Uri]]: + def read(self) -> Optional[Tuple[event_pb2.Event, Any]]: # Read the message's length as bytes and convert it to an integer maybe_file_stream = self._file_stream if maybe_file_stream is None: return None file_stream = cast(IO, maybe_file_stream) - event_len = int.from_bytes(file_stream.read(1), sys.byteorder) + event_len = int.from_bytes(file_stream.read(4), sys.byteorder) if event_len == 0: self.close() - return + return None # Read that number of bytes as the message bytes event: bytes = file_stream.read(event_len) @@ -146,9 +153,8 @@ def read(self) -> Optional[Tuple[Any, uri_pb2.Uri]]: event_proto.ParseFromString(event) # parse the message - message_cls = getattr( - importlib.import_module(event_proto.uri.scheme), event_proto.uri.authority - ) + name, package = parse_protobuf_descriptor(event_proto.uri.scheme) + message_cls = getattr(importlib.import_module(package), name) message: bytes = file_stream.read(event_proto.payload_length) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index 68b1c3ca..51f11dba 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -50,21 +50,22 @@ def close(self) -> bool: self._file_stream = None return self.is_closed() - def write(self, message: Any, uri: Optional[uri_pb2.Uri] = None) -> None: + def write(self, message: Any) -> None: maybe_file_stream = self._file_stream if maybe_file_stream is None: - return None + return file_stream = cast(IO, maybe_file_stream) - if uri is None: - uri = uri_pb2.Uri() + # create a Uri to store the descriptor of the message + # it comes in the form of `farm_ng.core.proto.Timestamp` + uri = uri_pb2.Uri(scheme=message.DESCRIPTOR.full_name) event = event_pb2.Event( uri=uri, payload_length=message.ByteSize(), ) - event_len: bytes = event.ByteSize().to_bytes(1, sys.byteorder) + event_len: bytes = event.ByteSize().to_bytes(4, sys.byteorder) file_stream.write(event_len) file_stream.write(event.SerializeToString()) diff --git a/py/farm_ng/core/uri.py b/py/farm_ng/core/uri.py index 360b4bbc..f3e448a1 100644 --- a/py/farm_ng/core/uri.py +++ b/py/farm_ng/core/uri.py @@ -28,17 +28,6 @@ def from_string(cls, string: str) -> "Uri": scheme_name, remainder = string.split("://") remainder, query = remainder.split("?") authority, path = remainder.split("/") - return Uri.from_strings(scheme_name, authority, path, query) - - @classmethod - def from_strings( - cls, scheme_name: str, authority: str, path: str, query: str - ) -> "Uri": - scheme_value = uri_pb2.UriSchemeType.DESCRIPTOR.values_by_name[ - scheme_name.upper() - ] - return Uri( - uri_pb2.Uri( - scheme=scheme_value.number, authority=authority, path=path, query=query - ) + return cls( + uri_pb2.Uri(scheme=scheme_name, authority=authority, path=path, query=query) ) diff --git a/py/tests/tests/test_core.py b/py/tests/tests/test_core.py index ffe81324..29b6bfd6 100644 --- a/py/tests/tests/test_core.py +++ b/py/tests/tests/test_core.py @@ -2,6 +2,7 @@ import pytest from farm_ng.core import uri_pb2 +from farm_ng.core import timestamp_pb2 from farm_ng.core.events_file_reader import EventsFileReader from farm_ng.core.events_file_writer import EventsFileWriter @@ -73,11 +74,19 @@ def test_write_read( ) -> None: # write file assert writer.open() - uri = uri_pb2.Uri(scheme="farm_ng.core.uri_pb2", authority="Uri") - writer.write(uri, uri) + for i in range(1, 10): + time_stamp = timestamp_pb2.Timestamp(stamp=i) + writer.write(time_stamp) assert writer.close() # read back the data assert reader.open() - _, uri_out = reader.read() + count = 1 + while True: + res = reader.read() + if res is None: + break + _, msg = res + assert msg.stamp == count + count += 1 + assert reader.close() - assert uri == uri_out From 89fd065e858f5cca4cddaecc8b80671970d49ac7 Mon Sep 17 00:00:00 2001 From: Ethan Rublee Date: Sun, 9 Oct 2022 11:08:12 -0700 Subject: [PATCH 16/29] use v0.1.1 of cmake. --- super_project/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/super_project/CMakeLists.txt b/super_project/CMakeLists.txt index 9a4e7bde..8b85f250 100644 --- a/super_project/CMakeLists.txt +++ b/super_project/CMakeLists.txt @@ -6,7 +6,7 @@ include(FetchContent) FetchContent_Declare( farm_ng_cmake GIT_REPOSITORY https://github.com/farm-ng/farm-ng-cmake.git - GIT_TAG 91e279de02ef3ca65c2dab17bf772ea31eb91de2 + GIT_TAG v0.1.1 ) FetchContent_MakeAvailable(farm_ng_cmake) From c056595be73f6943adebad38d95afe22fabe2c88 Mon Sep 17 00:00:00 2001 From: Ethan Rublee Date: Sun, 9 Oct 2022 12:29:57 -0700 Subject: [PATCH 17/29] WIP fixing up format. --- .gitignore | 1 + .pylintrc | 3 + py/farm_ng/core/events_file_writer.py | 80 ++++++++++++++++----------- py/farm_ng/core/stamp.py | 22 ++++++++ py/setup.cfg | 5 ++ py/tests/tests/test_core.py | 27 ++++----- 6 files changed, 88 insertions(+), 50 deletions(-) create mode 100644 py/farm_ng/core/stamp.py diff --git a/.gitignore b/.gitignore index 378eac25..8432802f 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ build +venv diff --git a/.pylintrc b/.pylintrc index b2a5a3bb..f63195fd 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,2 +1,5 @@ +[MASTER] +load-plugins=pylint_protobuf + [MESSAGES CONTROL] disable=missing-module-docstring,missing-class-docstring,missing-function-docstring diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index 51f11dba..e6e611fd 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -1,21 +1,33 @@ -import sys +import struct from pathlib import Path -from typing import Any from typing import cast from typing import IO from typing import Optional +from typing import List +import time +from farm_ng.core.stamp import get_monotonic_now +from google.protobuf.message import Message -from farm_ng.core import event_pb2 -from farm_ng.core import uri_pb2 +# pylint can't find Event or Uri in protobuf generated files +# https://github.com/protocolbuffers/protobuf/issues/10372 +from farm_ng.core.event_pb2 import Event +from farm_ng.core.uri_pb2 import Uri +from farm_ng.core.timestamp_pb2 import Timestamp class EventsFileWriter: def __init__(self, file_name: Path) -> None: - self._file_name = file_name.absolute() + self._file_name: Path = file_name.absolute() assert Path(self._file_name.parents[0]).is_dir() - self._file_stream: Optional[IO] = None + def __enter__(self): + self.open() + return self + + def __exit__(self, type, value, traceback): + self.close() + def __repr__(self) -> str: return ( f"file_name: {str(self.file_name)}\n" @@ -23,10 +35,8 @@ def __repr__(self) -> str: ) @property - def file_name(self) -> Optional[Path]: - if self._file_stream is None: - return None - return Path(self._file_stream.name) + def file_name(self) -> Path: + return self._file_name def is_open(self) -> bool: return not self.is_closed() @@ -41,32 +51,36 @@ def open(self) -> bool: return self.is_open() def close(self) -> bool: - maybe_file_stream = self._file_stream - if maybe_file_stream is None: - return False - file_stream = cast(IO, maybe_file_stream) - + if self.is_closed(): + return True + file_stream = cast(IO, self._file_stream) file_stream.close() self._file_stream = None return self.is_closed() - def write(self, message: Any) -> None: - maybe_file_stream = self._file_stream - if maybe_file_stream is None: - return - file_stream = cast(IO, maybe_file_stream) - - # create a Uri to store the descriptor of the message - # it comes in the form of `farm_ng.core.proto.Timestamp` - uri = uri_pb2.Uri(scheme=message.DESCRIPTOR.full_name) - - event = event_pb2.Event( - uri=uri, - payload_length=message.ByteSize(), + def make_write_stamp(self) -> Timestamp: + return get_monotonic_now(semantics="events_file/write") + + def write( + self, uri: Uri, message: Message, timestamps: Optional[List[Timestamp]] = None + ) -> None: + assert self.is_open(), ("Event log is not open:", self.file_name) + if timestamps is None: + timestamps = [] + timestamps.append(self.make_write_stamp()) + file_stream = cast(IO, self._file_stream) + assert uri.scheme == message.DESCRIPTOR.full_name, ( + uri.scheme, + message.DESCRIPTOR.full_name, ) - - event_len: bytes = event.ByteSize().to_bytes(4, sys.byteorder) - + payload = message.SerializeToString() + event = Event( + uri=uri, + timestamps=timestamps, + payload_length=len(payload), + ).SerializeToString() + # note that < (little endian), I (4 bytes unsigned integer) + event_len: bytes = struct.pack(" None: + global _g_host_name + _g_host_name = name + + +def get_host_name() -> str: + return _g_host_name + + +def get_monotonic_now(semantics: str) -> Timestamp: + return Timestamp( + stamp=time.monotonic(), + clock_name=get_host_name() + "/monotonic", + semantics=semantics, + ) diff --git a/py/setup.cfg b/py/setup.cfg index 19979967..1ef03dc4 100644 --- a/py/setup.cfg +++ b/py/setup.cfg @@ -45,6 +45,11 @@ dev = pytest-mypy==0.9.1 pre-commit==2.20.0 mypy==0.971 + types-protobuf + pylint + grpcio-tools + mypy-protobuf + pylint-protobuf==0.20.2 [mypy] files = tests, examples diff --git a/py/tests/tests/test_core.py b/py/tests/tests/test_core.py index 29b6bfd6..32cbfa7a 100644 --- a/py/tests/tests/test_core.py +++ b/py/tests/tests/test_core.py @@ -7,27 +7,20 @@ from farm_ng.core.events_file_writer import EventsFileWriter -@pytest.fixture(name="writer") -def fixture_writer(tmpdir) -> EventsFileWriter: - file_name = Path(tmpdir) / "event.log" - return EventsFileWriter(file_name) - - -@pytest.fixture(name="reader") -def fixture_reader(tmpdir) -> EventsFileReader: - file_name = Path(tmpdir) / "event.log" - return EventsFileReader(file_name) +@pytest.fixture(name="log_file") +def fixture_writer(tmpdir) -> Path: + return Path(tmpdir) / "event.log" class TestEventsWriter: - def test_smoke(self, writer: EventsFileWriter) -> None: - # empty object - assert writer.is_closed() - assert not writer.is_open() - assert writer.file_name is None + def test_smoke(self, log_file: Path) -> None: + with EventsFileWriter(log_file) as writer: + assert writer.is_open() + assert writer.file_name == log_file - def test_open_close(self, writer: EventsFileWriter) -> None: + def test_open_close(self, log_file: Path) -> None: # open the file + writer = EventsFileWriter(log_file) assert writer.open() assert not writer.is_closed() assert writer.is_open() @@ -36,7 +29,7 @@ def test_open_close(self, writer: EventsFileWriter) -> None: assert writer.close() assert writer.is_closed() assert not writer.is_open() - assert writer.file_name is None + assert writer.file_name == log_file def test_write_images(self, writer: EventsFileWriter) -> None: assert writer.open() From b1aecd85f7daf4d27bf7e6b813f002aa8547a121 Mon Sep 17 00:00:00 2001 From: Ethan Rublee Date: Sun, 9 Oct 2022 20:40:33 -0700 Subject: [PATCH 18/29] Test passing. --- py/farm_ng/core/events_file_reader.py | 192 ++++++++++++++------------ py/farm_ng/core/events_file_writer.py | 41 +++--- py/farm_ng/core/stamp.py | 13 +- py/farm_ng/core/uri.py | 53 +++---- py/tests/tests/test_core.py | 77 +++++------ 5 files changed, 188 insertions(+), 188 deletions(-) diff --git a/py/farm_ng/core/events_file_reader.py b/py/farm_ng/core/events_file_reader.py index 3d10ca7c..4fffa073 100644 --- a/py/farm_ng/core/events_file_reader.py +++ b/py/farm_ng/core/events_file_reader.py @@ -9,17 +9,21 @@ from typing import IO from typing import List from typing import Optional -import logging +import struct +import os +from farm_ng.core.event_pb2 import Event +from farm_ng.core.uri_pb2 import Uri +from farm_ng.core.uri import uri_to_string, string_to_uri -from farm_ng.core import event_pb2 -from farm_ng.core.uri import Uri -from farm_ng.core import uri_pb2 - -def parse_protobuf_descriptor(desc) -> Tuple[str, str]: +def parse_protobuf_descriptor(uri: Uri) -> Tuple[str, str]: + assert uri.scheme == "protobuf" # parse a proto descriptor and extract the message name and package. # See: https://developers.google.com/protocol-buffers/docs/reference/python-generated#invocation # NOTE: the descriptor comes in the shape of `farm_ng.core.proto.Timestamp` + type_split = uri.query.split("type=") + assert len(type_split) == 2 + desc = type_split[1].split("&")[0] # query string can have multiple key/value pairs desc_list = desc.split(".") name = desc_list[-1] package = desc_list[:-2] + [name.lower() + "_pb2"] @@ -28,137 +32,143 @@ def parse_protobuf_descriptor(desc) -> Tuple[str, str]: class EventsFileReader: def __init__(self, file_name: Path) -> None: - self.logger = logging.getLogger(self.__class__.__name__) - self._file_name = file_name.absolute() - assert Path(self._file_name.parents[0]).is_dir() - + assert Path(self._file_name.parents[0]).is_dir(), self._file_name + assert self._file_name.exists(), self._file_name self._file_stream: Optional[IO] = None + self._file_length: int = 0 + # assert self.open() # store the bytes offset in a dictionary where: # - the key is the string representation of the Uri of the message # - the value is a list of offsets to messages with that Uri - self.offsets: DefaultDict[str, List[int]] = defaultdict(list) + self.offsets: DefaultDict[Uri, List[int]] = defaultdict(list) + + def __enter__(self): + self.open() + return self + + def __exit__(self, type, value, traceback): + self.close() def __repr__(self) -> str: return ( - f"file_name: {str(self.file_name)}\n" - f"file_stream: {self._file_stream}\nis_open: {self.is_open}" + f"file_name: {str(self.file_name)} " + + f"file_stream: {self._file_stream} " + + f"is_open: {self.is_open()} " + + f"file_length: {self.file_length}" ) + @property + def file_name(self) -> Optional[Path]: + if self._file_stream is None: + return None + return Path(self._file_stream.name) + + def is_open(self) -> bool: + return not self.is_closed() + + def is_closed(self) -> bool: + if self._file_stream is None: + return True + return self._file_stream.closed + + def open(self) -> bool: + self._file_stream = open(self._file_name, "rb") + self._file_length = os.path.getsize(self._file_name) + self._compute_offsets() + return self.is_open() + + def close(self) -> bool: + if self.is_closed(): + return True + file_stream = cast(IO, self._file_stream) + file_stream.close() + self._file_stream = None + return self.is_closed() + def reset_offsets(self) -> None: self.offsets = defaultdict(list) def uris(self) -> List[Uri]: if self.offsets is None: return [] - return [Uri.from_string(key) for key in self.offsets.keys()] + return [string_to_uri(key) for key in self.offsets.keys()] - def has_uri(self, uri) -> bool: + def has_uri(self, uri: Uri) -> bool: if self.offsets is None: return False - return uri.string() in self.offsets.keys() + return uri_to_string(uri) in self.offsets.keys() - def _read_next_event(self) -> Optional[event_pb2.Event]: - event_len = int.from_bytes(self._file_stream.read(4), sys.byteorder) - # end file condition - if event_len == 0: - self._file_stream.seek(0) - return None + def _read_next_event(self) -> Event: + buffer = self._file_stream.read(4) + if len(buffer) != 4: + raise EOFError() + event_len = struct.unpack(" None: + def _skip_next_message(self, event: Event) -> None: msg_bytes: int = event.payload_length - self._file_stream.seek(msg_bytes, 4) + self._file_stream.seek(msg_bytes, 1) def _compute_offsets(self) -> None: - if not self.is_open: + if not self.is_open(): raise Exception("Reader not open. Please, use reader.open()") + file_stream = cast(IO, self._file_stream) + file_stream.seek(0) + # clear, if any the previous offsets self.reset_offsets() - while True: - current_offset = self._file_stream.tell() - maybe_event = self._read_next_event() - if maybe_event is None: - return - event = cast(event_pb2.Event, maybe_event) - uri = Uri(event.uri) - self.offsets[uri.string()].append(current_offset) - self._skip_next_message(event) + current_offset = file_stream.tell() + try: + event = self._read_next_event() + self.offsets[uri_to_string(event.uri)].append(current_offset) + self._skip_next_message(event) + except EOFError: + break - @property - def file_name(self) -> Optional[Path]: - if self._file_stream is None: - return None - return Path(self._file_stream.name) - - def is_open(self) -> bool: - return not self.is_closed() - - def is_closed(self) -> bool: - if self._file_stream is None: - return True - return self._file_stream.closed - - def open(self) -> bool: - self._file_stream = open(self._file_name, "rb") - self._compute_offsets() - return self.is_open() - - def close(self) -> bool: - if self._file_stream is not None: - self._file_stream.close() - self._file_stream = None - return self.is_closed() + file_stream.seek(0) def num_frames(self, uri: Uri) -> int: if not self.offsets: self._compute_offsets() - if uri.string() not in self.offsets.keys(): - return -1 - return len(self.offsets[uri.string()]) + if not self.has_uri(uri): + return 0 + return len(self.offsets[uri]) def seek(self, uri: Uri, frame_id: int) -> None: - assert uri.string() in self.offsets.keys() + assert uri in self.offsets.keys() assert frame_id < self.num_frames(uri) - maybe_file_stream = self._file_stream - if maybe_file_stream is None: - return - file_stream = cast(IO, maybe_file_stream) - file_stream.seek(self.offsets[uri.string()][frame_id]) - - def read(self) -> Optional[Tuple[event_pb2.Event, Any]]: - # Read the message's length as bytes and convert it to an integer - maybe_file_stream = self._file_stream - if maybe_file_stream is None: - return None - file_stream = cast(IO, maybe_file_stream) + assert self.is_open() - event_len = int.from_bytes(file_stream.read(4), sys.byteorder) - if event_len == 0: - self.close() - return None - - # Read that number of bytes as the message bytes - event: bytes = file_stream.read(event_len) + file_stream = cast(IO, self._file_stream) + file_stream.seek(self.offsets[uri_to_string(uri)][frame_id]) - event_proto = event_pb2.Event() - event_proto.ParseFromString(event) + def read(self) -> Tuple[Event, Any]: + assert self.is_open() + event = self._read_next_event() - # parse the message - name, package = parse_protobuf_descriptor(event_proto.uri.scheme) + name, package = parse_protobuf_descriptor(event.uri) message_cls = getattr(importlib.import_module(package), name) - message: bytes = file_stream.read(event_proto.payload_length) + file_stream = cast(IO, self._file_stream) + payload: bytes = file_stream.read(event.payload_length) + + message = message_cls() + message.ParseFromString(payload) - message_out = message_cls() - message_out.ParseFromString(message) + return event, message - return event_proto, message_out + def read_messages(self): + try: + while True: + event, message = self.read() + yield event, message + except EOFError: + pass diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index e6e611fd..e94795af 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -4,8 +4,8 @@ from typing import IO from typing import Optional from typing import List -import time from farm_ng.core.stamp import get_monotonic_now +from farm_ng.core.uri import make_proto_uri from google.protobuf.message import Message # pylint can't find Event or Uri in protobuf generated files @@ -20,6 +20,7 @@ def __init__(self, file_name: Path) -> None: self._file_name: Path = file_name.absolute() assert Path(self._file_name.parents[0]).is_dir() self._file_stream: Optional[IO] = None + self._file_length = 0 def __enter__(self): self.open() @@ -29,10 +30,11 @@ def __exit__(self, type, value, traceback): self.close() def __repr__(self) -> str: - return ( - f"file_name: {str(self.file_name)}\n" - f"file_stream: {self._file_stream}\nis_open: {self.is_open}" - ) + return f"file_name: {str(self._file_name)}\n" + f"is_open: {self.is_open}" + + @property + def file_length(self) -> int: + return self._file_length @property def file_name(self) -> Path: @@ -48,6 +50,7 @@ def is_closed(self) -> bool: def open(self) -> bool: self._file_stream = open(self._file_name, "wb") + self._file_length = 0 return self.is_open() def close(self) -> bool: @@ -59,20 +62,13 @@ def close(self) -> bool: return self.is_closed() def make_write_stamp(self) -> Timestamp: - return get_monotonic_now(semantics="events_file/write") + return get_monotonic_now(semantics="log/write") - def write( - self, uri: Uri, message: Message, timestamps: Optional[List[Timestamp]] = None + def write_raw( + self, uri: Uri, message: Message, timestamps: List[Timestamp] ) -> None: assert self.is_open(), ("Event log is not open:", self.file_name) - if timestamps is None: - timestamps = [] - timestamps.append(self.make_write_stamp()) file_stream = cast(IO, self._file_stream) - assert uri.scheme == message.DESCRIPTOR.full_name, ( - uri.scheme, - message.DESCRIPTOR.full_name, - ) payload = message.SerializeToString() event = Event( uri=uri, @@ -81,6 +77,15 @@ def write( ).SerializeToString() # note that < (little endian), I (4 bytes unsigned integer) event_len: bytes = struct.pack(" None: + if timestamps is None: + timestamps = [] + timestamps.append(self.make_write_stamp()) + uri = make_proto_uri(path=path, message=message) + self.write_raw(uri=uri, message=message, timestamps=timestamps) diff --git a/py/farm_ng/core/stamp.py b/py/farm_ng/core/stamp.py index 6dc3c660..3b82e8a2 100644 --- a/py/farm_ng/core/stamp.py +++ b/py/farm_ng/core/stamp.py @@ -1,17 +1,6 @@ import time from farm_ng.core.timestamp_pb2 import Timestamp -import platform - -_g_host_name = platform.node() - - -def set_host_name(name: str) -> None: - global _g_host_name - _g_host_name = name - - -def get_host_name() -> str: - return _g_host_name +from farm_ng.core.uri import get_host_name def get_monotonic_now(semantics: str) -> Timestamp: diff --git a/py/farm_ng/core/uri.py b/py/farm_ng/core/uri.py index f3e448a1..926bd683 100644 --- a/py/farm_ng/core/uri.py +++ b/py/farm_ng/core/uri.py @@ -1,33 +1,38 @@ from farm_ng.core import uri_pb2 +from google.protobuf.message import Message +import platform +_g_host_name = platform.node() -class Uri: - def __init__(self, uri: uri_pb2.Uri) -> None: - self._uri = uri - def __eq__(self, other: "Uri") -> bool: - assert isinstance(other, Uri), f"Expected Uri type. Got: {type(other)}" - return self.string() == other.string() +def set_host_name(name: str) -> None: + global _g_host_name + _g_host_name = name - def __repr__(self) -> str: - return self.string() - @property - def proto(self) -> uri_pb2.Uri: - return self._uri +def get_host_name() -> str: + return _g_host_name - @classmethod - def empty(cls) -> "Uri": - return Uri(uri_pb2.Uri()) - def string(self) -> str: - return f"{self.proto.scheme}://{self.proto.authority}/{self.proto.path}?{self.proto.query}" +def get_authority() -> str: + return get_host_name() - @classmethod - def from_string(cls, string: str) -> "Uri": - scheme_name, remainder = string.split("://") - remainder, query = remainder.split("?") - authority, path = remainder.split("/") - return cls( - uri_pb2.Uri(scheme=scheme_name, authority=authority, path=path, query=query) - ) + +def make_proto_uri(path, message: Message) -> uri_pb2.Uri: + return uri_pb2.Uri( + scheme="protobuf", + authority=get_authority(), + path=path, + query="type=" + message.DESCRIPTOR.full_name, + ) + + +def uri_to_string(uri: uri_pb2.Uri) -> str: + return f"{uri.scheme}://{uri.authority}/{uri.path}?{uri.query}" + + +def string_to_uri(string: str) -> uri_pb2.Uri: + scheme_name, remainder = string.split("://") + remainder, query = remainder.split("?") + authority, path = remainder.split("/") + return uri_pb2.Uri(scheme=scheme_name, authority=authority, path=path, query=query) diff --git a/py/tests/tests/test_core.py b/py/tests/tests/test_core.py index 32cbfa7a..0b10236b 100644 --- a/py/tests/tests/test_core.py +++ b/py/tests/tests/test_core.py @@ -1,10 +1,10 @@ from pathlib import Path import pytest -from farm_ng.core import uri_pb2 from farm_ng.core import timestamp_pb2 from farm_ng.core.events_file_reader import EventsFileReader from farm_ng.core.events_file_writer import EventsFileWriter +from farm_ng.core.stamp import get_monotonic_now @pytest.fixture(name="log_file") @@ -31,55 +31,46 @@ def test_open_close(self, log_file: Path) -> None: assert not writer.is_open() assert writer.file_name == log_file - def test_write_images(self, writer: EventsFileWriter) -> None: + def test_write_images(self, log_file: Path) -> None: + writer = EventsFileWriter(log_file) assert writer.open() - uri = uri_pb2.Uri() - writer.write(uri) + writer.write("test/uri", message=get_monotonic_now(semantics="test/monotonic")) assert writer.close() + assert writer.file_length == 181, writer.file_length class TestEventsReader: - def test_smoke(self, reader: EventsFileReader) -> None: + def test_smoke(self, log_file: Path) -> None: + with EventsFileWriter(log_file) as writer: + writer.write( + path="hello", message=get_monotonic_now(semantics="test/monotonic") + ) # empty object - assert reader.is_closed() - assert not reader.is_open() - assert reader.file_name is None + with EventsFileReader(log_file) as reader: + assert reader.is_open() + print(reader.uris()) - def test_open_close( - self, writer: EventsFileWriter, reader: EventsFileReader - ) -> None: - # touch file - assert writer.open() - assert writer.close() - # open the file - assert reader.open() - assert not reader.is_closed() - assert reader.is_open() - assert reader.file_name.name == "event.log" - # close the file - assert reader.close() - assert reader.is_closed() - assert not reader.is_open() - assert reader.file_name is None + def test_write_read(self, log_file: Path) -> None: + with EventsFileWriter(log_file) as writer: + for i in range(10): + time_stamp = timestamp_pb2.Timestamp(stamp=i) + writer.write(path="hello", message=time_stamp) + writer.write(path="world", message=time_stamp) + print(writer.file_length) + # empty object + with EventsFileReader(log_file) as reader: + assert reader.is_open() + uris = reader.uris() - def test_write_read( - self, writer: EventsFileWriter, reader: EventsFileReader - ) -> None: - # write file - assert writer.open() - for i in range(1, 10): - time_stamp = timestamp_pb2.Timestamp(stamp=i) - writer.write(time_stamp) - assert writer.close() - # read back the data - assert reader.open() - count = 1 - while True: - res = reader.read() - if res is None: - break - _, msg = res - assert msg.stamp == count - count += 1 + assert len(reader.uris()) == 2, uris + assert uris[0].path == "hello" + assert uris[1].path == "world" + count = 0 + for event, message in reader.read_messages(): + if event.uri.path == "hello": + assert message.stamp == count + elif event.uri.path == "world": + assert message.stamp == count + count += 1 assert reader.close() From b9b3cb4b9fc5257ec2e55353d8fc24d8f0e04ac7 Mon Sep 17 00:00:00 2001 From: Ethan Rublee Date: Sun, 9 Oct 2022 20:48:06 -0700 Subject: [PATCH 19/29] More coverage. --- py/farm_ng/core/events_file_reader.py | 2 +- py/farm_ng/core/uri.py | 4 +++- py/tests/tests/test_core.py | 14 ++++++++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/py/farm_ng/core/events_file_reader.py b/py/farm_ng/core/events_file_reader.py index 4fffa073..882d44a2 100644 --- a/py/farm_ng/core/events_file_reader.py +++ b/py/farm_ng/core/events_file_reader.py @@ -93,7 +93,7 @@ def reset_offsets(self) -> None: def uris(self) -> List[Uri]: if self.offsets is None: return [] - return [string_to_uri(key) for key in self.offsets.keys()] + return [string_to_uri(key) for key in sorted(self.offsets.keys())] def has_uri(self, uri: Uri) -> bool: if self.offsets is None: diff --git a/py/farm_ng/core/uri.py b/py/farm_ng/core/uri.py index 926bd683..44d76bcd 100644 --- a/py/farm_ng/core/uri.py +++ b/py/farm_ng/core/uri.py @@ -34,5 +34,7 @@ def uri_to_string(uri: uri_pb2.Uri) -> str: def string_to_uri(string: str) -> uri_pb2.Uri: scheme_name, remainder = string.split("://") remainder, query = remainder.split("?") - authority, path = remainder.split("/") + authority_path = remainder.split("/") + authority = authority_path[0] + path = "/".join(authority_path[1:]) return uri_pb2.Uri(scheme=scheme_name, authority=authority, path=path, query=query) diff --git a/py/tests/tests/test_core.py b/py/tests/tests/test_core.py index 0b10236b..5bb978c5 100644 --- a/py/tests/tests/test_core.py +++ b/py/tests/tests/test_core.py @@ -43,12 +43,22 @@ class TestEventsReader: def test_smoke(self, log_file: Path) -> None: with EventsFileWriter(log_file) as writer: writer.write( - path="hello", message=get_monotonic_now(semantics="test/monotonic") + path="hello/world", + message=get_monotonic_now(semantics="test/monotonic"), ) + writer.write( + path="/leading/slash", + message=get_monotonic_now(semantics="test/monotonic"), + ) + # em # empty object with EventsFileReader(log_file) as reader: assert reader.is_open() - print(reader.uris()) + uris = reader.uris() + print(uris) + # note lexographic ordering of paths. + assert uris[0].path == "/leading/slash" + assert uris[1].path == "hello/world" def test_write_read(self, log_file: Path) -> None: with EventsFileWriter(log_file) as writer: From f3d99665b2038fc0ac2ec8a2eb12766422b50e5e Mon Sep 17 00:00:00 2001 From: Ethan Rublee Date: Sun, 9 Oct 2022 20:52:57 -0700 Subject: [PATCH 20/29] Unused imports. --- py/farm_ng/core/events_file_reader.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/py/farm_ng/core/events_file_reader.py b/py/farm_ng/core/events_file_reader.py index 882d44a2..ceb02882 100644 --- a/py/farm_ng/core/events_file_reader.py +++ b/py/farm_ng/core/events_file_reader.py @@ -1,6 +1,4 @@ import importlib -from importlib.resources import Package -import sys from collections import defaultdict from pathlib import Path from typing import Any, Tuple @@ -56,7 +54,6 @@ def __repr__(self) -> str: f"file_name: {str(self.file_name)} " + f"file_stream: {self._file_stream} " + f"is_open: {self.is_open()} " - + f"file_length: {self.file_length}" ) @property From 24858afdabf2821f9495fef9bd58dbf24e8e995d Mon Sep 17 00:00:00 2001 From: Ethan Rublee Date: Sun, 9 Oct 2022 21:00:53 -0700 Subject: [PATCH 21/29] Version bump to 0.1.0 on python to match c++ --- py/setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/setup.cfg b/py/setup.cfg index 1ef03dc4..f48079d6 100644 --- a/py/setup.cfg +++ b/py/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = farm_ng_core -version = 0.0.1-dev +version = 0.1.0-dev description = long_description = file: README.md author = Farm-ng Inc. From 36f7b50b25e620179578ef5a830d5572a18ab1bc Mon Sep 17 00:00:00 2001 From: Ethan Rublee Date: Sun, 9 Oct 2022 21:16:55 -0700 Subject: [PATCH 22/29] seek working, TODO rethink the behavior here. --- py/farm_ng/core/events_file_reader.py | 29 +++++++++++++++------------ py/tests/tests/test_core.py | 17 +++++++++++----- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/py/farm_ng/core/events_file_reader.py b/py/farm_ng/core/events_file_reader.py index ceb02882..eb916fba 100644 --- a/py/farm_ng/core/events_file_reader.py +++ b/py/farm_ng/core/events_file_reader.py @@ -40,7 +40,7 @@ def __init__(self, file_name: Path) -> None: # store the bytes offset in a dictionary where: # - the key is the string representation of the Uri of the message # - the value is a list of offsets to messages with that Uri - self.offsets: DefaultDict[Uri, List[int]] = defaultdict(list) + self._offsets: DefaultDict[str, List[int]] = defaultdict(list) def __enter__(self): self.open() @@ -73,7 +73,6 @@ def is_closed(self) -> bool: def open(self) -> bool: self._file_stream = open(self._file_name, "rb") self._file_length = os.path.getsize(self._file_name) - self._compute_offsets() return self.is_open() def close(self) -> bool: @@ -85,17 +84,17 @@ def close(self) -> bool: return self.is_closed() def reset_offsets(self) -> None: - self.offsets = defaultdict(list) + self._offsets = defaultdict(list) def uris(self) -> List[Uri]: - if self.offsets is None: - return [] - return [string_to_uri(key) for key in sorted(self.offsets.keys())] + if len(self._offsets) == 0: + self._compute_offsets() + return [string_to_uri(key) for key in sorted(self._offsets.keys())] def has_uri(self, uri: Uri) -> bool: - if self.offsets is None: + if self._offsets is None: return False - return uri_to_string(uri) in self.offsets.keys() + return uri_to_string(uri) in self._offsets.keys() def _read_next_event(self) -> Event: buffer = self._file_stream.read(4) @@ -125,27 +124,30 @@ def _compute_offsets(self) -> None: current_offset = file_stream.tell() try: event = self._read_next_event() - self.offsets[uri_to_string(event.uri)].append(current_offset) + self._offsets[uri_to_string(event.uri)].append(current_offset) self._skip_next_message(event) except EOFError: break file_stream.seek(0) + def offsets(self, uri: Uri) -> List[int]: + return self._offsets[uri_to_string(uri)] + def num_frames(self, uri: Uri) -> int: - if not self.offsets: + if not self._offsets: self._compute_offsets() if not self.has_uri(uri): return 0 - return len(self.offsets[uri]) + return len(self.offsets(uri)) def seek(self, uri: Uri, frame_id: int) -> None: - assert uri in self.offsets.keys() + assert self.has_uri(uri) assert frame_id < self.num_frames(uri) assert self.is_open() file_stream = cast(IO, self._file_stream) - file_stream.seek(self.offsets[uri_to_string(uri)][frame_id]) + file_stream.seek(self._offsets[uri_to_string(uri)][frame_id]) def read(self) -> Tuple[Event, Any]: assert self.is_open() @@ -163,6 +165,7 @@ def read(self) -> Tuple[Event, Any]: return event, message def read_messages(self): + self._file_stream.seek(0) try: while True: event, message = self.read() diff --git a/py/tests/tests/test_core.py b/py/tests/tests/test_core.py index 5bb978c5..4caa69c7 100644 --- a/py/tests/tests/test_core.py +++ b/py/tests/tests/test_core.py @@ -70,11 +70,6 @@ def test_write_read(self, log_file: Path) -> None: # empty object with EventsFileReader(log_file) as reader: assert reader.is_open() - uris = reader.uris() - - assert len(reader.uris()) == 2, uris - assert uris[0].path == "hello" - assert uris[1].path == "world" count = 0 for event, message in reader.read_messages(): if event.uri.path == "hello": @@ -83,4 +78,16 @@ def test_write_read(self, log_file: Path) -> None: assert message.stamp == count count += 1 + uris = reader.uris() + + assert len(reader.uris()) == 2, uris + assert uris[0].path == "hello" + assert uris[1].path == "world" + assert reader.num_frames(uris[0]) == 10 + assert reader.num_frames(uris[1]) == 10 + for frame_n in range(reader.num_frames(uris[0])): + reader.seek(uri=uris[0], frame_id=frame_n) + event, message = reader.read() + assert message.stamp == frame_n + assert reader.close() From b3ca19784404b12d8dddb81487a86b5af852bcda Mon Sep 17 00:00:00 2001 From: Ethan Rublee Date: Sun, 9 Oct 2022 21:46:18 -0700 Subject: [PATCH 23/29] offsets -> event index. --- py/farm_ng/core/events_file_reader.py | 61 ++++++++++++++------------- py/tests/tests/test_core.py | 23 +++++++--- 2 files changed, 50 insertions(+), 34 deletions(-) diff --git a/py/farm_ng/core/events_file_reader.py b/py/farm_ng/core/events_file_reader.py index eb916fba..0c335868 100644 --- a/py/farm_ng/core/events_file_reader.py +++ b/py/farm_ng/core/events_file_reader.py @@ -12,6 +12,7 @@ from farm_ng.core.event_pb2 import Event from farm_ng.core.uri_pb2 import Uri from farm_ng.core.uri import uri_to_string, string_to_uri +from google.protobuf.message import Message def parse_protobuf_descriptor(uri: Uri) -> Tuple[str, str]: @@ -40,7 +41,7 @@ def __init__(self, file_name: Path) -> None: # store the bytes offset in a dictionary where: # - the key is the string representation of the Uri of the message # - the value is a list of offsets to messages with that Uri - self._offsets: DefaultDict[str, List[int]] = defaultdict(list) + self._event_index: DefaultDict[str, List[Tuple(Event, int)]] = defaultdict(list) def __enter__(self): self.open() @@ -83,18 +84,18 @@ def close(self) -> bool: self._file_stream = None return self.is_closed() - def reset_offsets(self) -> None: - self._offsets = defaultdict(list) + def _reset_event_index(self) -> None: + self._event_index = defaultdict(list) def uris(self) -> List[Uri]: - if len(self._offsets) == 0: - self._compute_offsets() - return [string_to_uri(key) for key in sorted(self._offsets.keys())] + if not self._event_index: + self._build_event_index() + return [string_to_uri(key) for key in sorted(self._event_index.keys())] def has_uri(self, uri: Uri) -> bool: - if self._offsets is None: + if self._event_index is None: return False - return uri_to_string(uri) in self._offsets.keys() + return uri_to_string(uri) in self._event_index.keys() def _read_next_event(self) -> Event: buffer = self._file_stream.read(4) @@ -111,7 +112,7 @@ def _skip_next_message(self, event: Event) -> None: msg_bytes: int = event.payload_length self._file_stream.seek(msg_bytes, 1) - def _compute_offsets(self) -> None: + def _build_event_index(self) -> None: if not self.is_open(): raise Exception("Reader not open. Please, use reader.open()") @@ -119,40 +120,38 @@ def _compute_offsets(self) -> None: file_stream.seek(0) # clear, if any the previous offsets - self.reset_offsets() + self._reset_event_index() while True: - current_offset = file_stream.tell() try: event = self._read_next_event() - self._offsets[uri_to_string(event.uri)].append(current_offset) + current_offset = file_stream.tell() + self._event_index[uri_to_string(event.uri)].append( + (event, current_offset) + ) self._skip_next_message(event) except EOFError: break file_stream.seek(0) - def offsets(self, uri: Uri) -> List[int]: - return self._offsets[uri_to_string(uri)] + def events(self, uri: Uri) -> List[Tuple[Event, int]]: + if not self._event_index: + self._build_event_index() + return self._event_index[uri_to_string(uri)] - def num_frames(self, uri: Uri) -> int: - if not self._offsets: - self._compute_offsets() + def num_events(self, uri: Uri) -> int: if not self.has_uri(uri): return 0 - return len(self.offsets(uri)) + return len(self.events(uri)) - def seek(self, uri: Uri, frame_id: int) -> None: + def get_event(self, uri: Uri, frame_id: int) -> Tuple[Event, int]: assert self.has_uri(uri) - assert frame_id < self.num_frames(uri) - assert self.is_open() - - file_stream = cast(IO, self._file_stream) - file_stream.seek(self._offsets[uri_to_string(uri)][frame_id]) - - def read(self) -> Tuple[Event, Any]: - assert self.is_open() - event = self._read_next_event() + assert frame_id < self.num_events(uri) + return self.events(uri)[frame_id] + def read_message(self, event: Event, offset: Optional[int] = None) -> Message: + if offset is not None: + self._file_stream.seek(offset, 0) name, package = parse_protobuf_descriptor(event.uri) message_cls = getattr(importlib.import_module(package), name) @@ -161,8 +160,12 @@ def read(self) -> Tuple[Event, Any]: message = message_cls() message.ParseFromString(payload) + return message - return event, message + def read(self) -> Tuple[Event, Message]: + assert self.is_open() + event = self._read_next_event() + return event, self.read_message(event) def read_messages(self): self._file_stream.seek(0) diff --git a/py/tests/tests/test_core.py b/py/tests/tests/test_core.py index 4caa69c7..62664ca5 100644 --- a/py/tests/tests/test_core.py +++ b/py/tests/tests/test_core.py @@ -80,14 +80,27 @@ def test_write_read(self, log_file: Path) -> None: uris = reader.uris() + # TODO This api is likely in flux + # there are several ways we want to seek and + # iterate over the reader + # frame order, per uri + # time order, per uri + # time order, all uris + # + # Perhaps you can just get a list of all events + # and filter it at the user level. + # Then: reader.read(offset, event) will seek to and read the given event + for event, offset in reader.events(uri=uris[0]): + print(offset, event.timestamps[-1].stamp) + assert len(reader.uris()) == 2, uris assert uris[0].path == "hello" assert uris[1].path == "world" - assert reader.num_frames(uris[0]) == 10 - assert reader.num_frames(uris[1]) == 10 - for frame_n in range(reader.num_frames(uris[0])): - reader.seek(uri=uris[0], frame_id=frame_n) - event, message = reader.read() + assert reader.num_events(uris[0]) == 10 + assert reader.num_events(uris[1]) == 10 + for frame_n in range(reader.num_events(uris[0])): + event, offset = reader.get_event(uri=uris[0], frame_id=frame_n) + message = reader.read_message(event, offset) assert message.stamp == frame_n assert reader.close() From 82c05ae3b4b5cb171f757093240f388a6e026251 Mon Sep 17 00:00:00 2001 From: Ethan Rublee Date: Sun, 9 Oct 2022 23:39:59 -0700 Subject: [PATCH 24/29] Add c++ reader/writer first pass. --- .gitignore | 1 + cpp/farm_ng/core/prototools/CMakeLists.txt | 11 +- .../core/prototools/event_log_reader.cpp | 111 +++++++++++++++++ .../core/prototools/event_log_reader.h | 68 +++++++++++ .../core/prototools/event_log_writer.cpp | 113 ++++++++++++++++++ .../core/prototools/event_log_writer.h | 63 ++++++++++ .../core/prototools/event_log_writer_test.cpp | 54 +++++++++ 7 files changed, 417 insertions(+), 4 deletions(-) create mode 100644 cpp/farm_ng/core/prototools/event_log_reader.cpp create mode 100644 cpp/farm_ng/core/prototools/event_log_reader.h create mode 100644 cpp/farm_ng/core/prototools/event_log_writer.cpp create mode 100644 cpp/farm_ng/core/prototools/event_log_writer.h create mode 100644 cpp/farm_ng/core/prototools/event_log_writer_test.cpp diff --git a/.gitignore b/.gitignore index 8432802f..17380e50 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ build venv +.pytest_cache diff --git a/cpp/farm_ng/core/prototools/CMakeLists.txt b/cpp/farm_ng/core/prototools/CMakeLists.txt index 9f53b659..8c1b60cc 100644 --- a/cpp/farm_ng/core/prototools/CMakeLists.txt +++ b/cpp/farm_ng/core/prototools/CMakeLists.txt @@ -9,17 +9,20 @@ farm_ng_add_library(farm_ng_core_prototools INCLUDE_DIR ../../.. HEADERS proto_reader_writer.h + event_log_writer.h + event_log_reader.h SOURCES proto_reader_writer.cpp - + event_log_writer.cpp + event_log_reader.cpp ) -target_link_libraries(farm_ng_core_prototools protobuf::libprotobuf farm_ng_core_logging) +target_link_libraries(farm_ng_core_prototools protobuf::libprotobuf farm_ng_core_logging farm_ng_core_proto_defs) -foreach(test_basename proto_reader_writer) +foreach(test_basename proto_reader_writer event_log_writer) farm_ng_add_test(${test_basename} PARENT_LIBRARY farm_ng_core_prototools - LINK_LIBRARIES farm_ng_core_prototools + LINK_LIBRARIES farm_ng_core_prototools farm_ng_core_misc LABELS small) endforeach() diff --git a/cpp/farm_ng/core/prototools/event_log_reader.cpp b/cpp/farm_ng/core/prototools/event_log_reader.cpp new file mode 100644 index 00000000..f991a87b --- /dev/null +++ b/cpp/farm_ng/core/prototools/event_log_reader.cpp @@ -0,0 +1,111 @@ +// Copyright (c) farm-ng, inc. +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +// Copyright (c) farm-ng, inc. All rights reserved. + +#include "farm_ng/core/prototools/event_log_reader.h" + +#include "farm_ng/core/logging/logger.h" + +#include +#include +#include +#include + +namespace farm_ng { + +class EventLogReaderBinaryImpl : public EventLogReaderImpl { + public: + EventLogReaderBinaryImpl(std::filesystem::path const& log_path) + : log_path(log_path), in(log_path.string(), std::ofstream::binary) { + if (!in) { + throw std::runtime_error( + FARM_FORMAT("Could not open file: {}", log_path)); + } + } + + std::string readBytes(uint64_t n_bytes) { + std::string str; + str.resize(n_bytes); + in.read(str.data(), n_bytes); + if (!in) { + throw std::runtime_error("Could not read data."); + } + return str; + } + + uint64_t readEventSize() { + uint64_t n_bytes = 0; + in.read(reinterpret_cast(&n_bytes), sizeof(n_bytes)); + if (!in) { + throw std::runtime_error("Could not read packet length header"); + } + return n_bytes; + } + + virtual std::tuple readNextEvent( + std::string* payload = nullptr) override { + core::proto::Event event; + if (!event.ParseFromString(readBytes(readEventSize()))) { + throw std::runtime_error("Could not parse event."); + } + std::streampos pos = in.tellg(); + if (payload) { + *payload = readBytes(event.payload_length()); + } else { + in.seekg(event.payload_length(), in.cur); + if (!in) { + throw std::runtime_error("Could not seek past payload."); + } + } + return std::make_tuple(event, pos); + } + virtual std::string readPayload( + core::proto::Event const& event, std::streampos pos) { + in.seekg(pos); + if (!in) { + throw std::runtime_error("Could not seek to read payload."); + } + return readBytes(event.payload_length()); + } + + std::filesystem::path getPath() const override { return log_path; } + + std::filesystem::path log_path; + std::ifstream in; +}; + +namespace { +void resetImpl( + std::unique_ptr& impl, + std::filesystem::path const& log_path) { + impl = std::make_unique(log_path); +} +} // namespace + +EventLogReader::EventLogReader(std::filesystem::path const& log_path) { + resetImpl(impl_, log_path); +} + +EventLogReader::~EventLogReader() { impl_.reset(nullptr); } + +std::tuple EventLogReader::readNextEvent( + std::string* payload) { + return impl_->readNextEvent(payload); +} + +std::string EventLogReader::readPayload( + core::proto::Event const& event, std::streampos pos) { + return impl_->readPayload(event, pos); +} + +std::filesystem::path EventLogReader::getPath() const { + return impl_->getPath(); +} + +void EventLogReader::reset() { resetImpl(impl_, impl_->getPath()); } + +} // namespace farm_ng diff --git a/cpp/farm_ng/core/prototools/event_log_reader.h b/cpp/farm_ng/core/prototools/event_log_reader.h new file mode 100644 index 00000000..ce148542 --- /dev/null +++ b/cpp/farm_ng/core/prototools/event_log_reader.h @@ -0,0 +1,68 @@ +// Copyright (c) farm-ng, inc. +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +// Copyright (c) farm-ng, inc. All rights reserved. + +#pragma once + +#include "farm_ng/core/event.pb.h" + +#include +#include +#include + +namespace farm_ng { + +/// Implementation of the `EventLogReader` class +/// +class EventLogReaderImpl { + public: + /// https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rh-dtor + virtual ~EventLogReaderImpl() {} + + /// Returns next event. + virtual std::tuple readNextEvent( + std::string* payload = nullptr) = 0; + + virtual std::string readPayload( + core::proto::Event const& event, std::streampos pos) = 0; + + /// Returns the path including the fileaname + virtual std::filesystem::path getPath() const = 0; +}; + +/// Reader to deserialize data written by the EventLogWriter. +/// +class EventLogReader { + public: + /// Open's log file to read. + /// + /// Throws runtime-error if files could not be opened or if logfile does not + /// contain a valid header. + explicit EventLogReader(std::filesystem::path const& log_path); + + EventLogReader(EventLogReader&&) = default; + EventLogReader& operator=(EventLogReader&&) = default; + + /// https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rh-dtor + virtual ~EventLogReader(); + + std::tuple readNextEvent( + std::string* payload = nullptr); + + std::string readPayload(core::proto::Event const& event, std::streampos pos); + + /// Returns the path including the fileaname + std::filesystem::path getPath() const; + + /// Reset the writer to the beginning of the file + void reset(); + + private: + std::unique_ptr impl_; +}; + +} // namespace farm_ng diff --git a/cpp/farm_ng/core/prototools/event_log_writer.cpp b/cpp/farm_ng/core/prototools/event_log_writer.cpp new file mode 100644 index 00000000..45a04a77 --- /dev/null +++ b/cpp/farm_ng/core/prototools/event_log_writer.cpp @@ -0,0 +1,113 @@ +// Copyright (c) farm-ng, inc. +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +// Copyright (c) farm-ng, inc. All rights reserved. + +#include "farm_ng/core/prototools/event_log_writer.h" + +#include +#include + +#include +#include +#include +#include +#include + +char hostname[HOST_NAME_MAX]; +char username[LOGIN_NAME_MAX]; + +namespace farm_ng { +namespace { +std::string getAuthoritySlow() { + char hostname[HOST_NAME_MAX]; + gethostname(hostname, HOST_NAME_MAX); + return std::string(hostname); +} +std::string const& getAuthority() { + static std::string host_name = getAuthoritySlow(); + return host_name; +} +double monotonic() { + double now = 1e-6 * std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + return now; +} +core::proto::Timestamp makeWriteStamp() { + core::proto::Timestamp stamp; + stamp.set_stamp(monotonic()); + stamp.set_semantics("log/write"); + stamp.set_clock_name(getAuthority() + "/monotonic"); + return stamp; +} + +} // namespace + +class EventLogWriterBinaryImpl : public EventLogWriterImpl { + public: + EventLogWriterBinaryImpl(std::filesystem::path const& log_path) + : log_path(log_path), out(log_path.string(), std::ofstream::binary) {} + + void write( + std::string path, + google::protobuf::Message const& message, + std::vector const& timestamps) override { + std::string payload; + message.SerializeToString(&payload); + core::proto::Event event; + // see py/farm_ng/core/uri.py + core::proto::Uri* uri = event.mutable_uri(); + uri->set_scheme("protobuf"); + uri->set_authority(getAuthority()); + uri->set_path(path); + uri->set_query("type=" + message.GetDescriptor()->full_name()); + + for (core::proto::Timestamp const& stamp : timestamps) { + event.add_timestamps()->CopyFrom(stamp); + } + event.add_timestamps()->CopyFrom(makeWriteStamp()); + event.set_payload_length(payload.size()); + std::string event_str; + event.SerializeToString(&event_str); + uint64_t n_bytes = event_str.size(); + out.write(reinterpret_cast(&n_bytes), sizeof(n_bytes)); + out << event_str; + out << payload; + out.flush(); + } + + std::filesystem::path log_path; + std::ofstream out; +}; + +EventLogWriter::EventLogWriter(std::filesystem::path const& log_path) noexcept + : log_path_(log_path) { + // generate the directory tree in case it's empty or doesn't exist + + std::filesystem::path path_prefix = log_path; + path_prefix.remove_filename(); + + if (!std::filesystem::exists(path_prefix)) { + FARM_CHECK( + std::filesystem::create_directories(path_prefix), + "Could not create the directory: {}. It might exist already, please " + "check it out", + path_prefix); + } + impl_ = std::make_unique(log_path); +} + +EventLogWriter::~EventLogWriter() noexcept { impl_.reset(nullptr); } + +void EventLogWriter::write( + std::string path, + google::protobuf::Message const& message, + std::vector const& timestamps) { + impl_->write(path, message, timestamps); +} + +} // namespace farm_ng diff --git a/cpp/farm_ng/core/prototools/event_log_writer.h b/cpp/farm_ng/core/prototools/event_log_writer.h new file mode 100644 index 00000000..dd2994e5 --- /dev/null +++ b/cpp/farm_ng/core/prototools/event_log_writer.h @@ -0,0 +1,63 @@ +// Copyright (c) farm-ng, inc. +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +// Copyright (c) farm-ng, inc. All rights reserved. + +#pragma once + +#include "farm_ng/core/event.pb.h" + +#include +#include +#include + +namespace farm_ng { + +/// Implementation of the `EventLogWriter` class +/// +class EventLogWriterImpl { + public: + /// https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rh-dtor + virtual ~EventLogWriterImpl() {} + + virtual void write( + std::string path, + google::protobuf::Message const& message, + std::vector const& timestamps) = 0; +}; + +/// Class that serializes incoming protobuf events to a file in disk. +class EventLogWriter { + public: + /// Main constructor of the class. + /// + /// Precondition: It must be possible to create the folder ``log_path_`` if it + /// does not exist. If ``log-path`` does exist, it must be a folder and it + /// must be empty. + /// + EventLogWriter(std::filesystem::path const& log_path) noexcept; + + /// Main destructor + virtual ~EventLogWriter() noexcept; + + /// Writes an incoming protobuf in the log file + void write( + std::string path, + google::protobuf::Message const& message, + std::vector const& timestamps = + std::vector()); + + /// Returns the path including the fileaname + std::filesystem::path getPath() const { return log_path_; } + + private: + // The log path including the filename + std::filesystem::path log_path_; + // Implementation pointer of the class + std::unique_ptr impl_; +}; + +} // namespace farm_ng diff --git a/cpp/farm_ng/core/prototools/event_log_writer_test.cpp b/cpp/farm_ng/core/prototools/event_log_writer_test.cpp new file mode 100644 index 00000000..16b30a24 --- /dev/null +++ b/cpp/farm_ng/core/prototools/event_log_writer_test.cpp @@ -0,0 +1,54 @@ +// Copyright (c) farm-ng, inc. +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +// Copyright (c) farm-ng, inc. All rights reserved. + +#include "farm_ng/core/prototools/event_log_writer.h" + +#include "farm_ng/core/prototools/event_log_reader.h" + +#include +#include +#include +#include + +using namespace farm_ng; +namespace { +std::string getHostName() { + char hostname[HOST_NAME_MAX]; + gethostname(hostname, HOST_NAME_MAX); + return std::string(hostname); +} +} // namespace + +TEST(event_log, roundtrip) { + auto maybe_log_dir = createUniqueTemporaryDirectory(); + auto log_dir = FARM_UNWRAP(maybe_log_dir); + auto file = log_dir / "events.log"; + { + EventLogWriter writer(file); + google::protobuf::Int32Value x; + for (int i = 0; i < 10; ++i) { + x.set_value(i); + writer.write("my_ints", x); + } + } + { + EventLogReader reader(file); + for (int i = 0; i < 10; ++i) { + std::string payload; + auto [event, pos] = reader.readNextEvent(&payload); + google::protobuf::Int32Value x; + EXPECT_EQ(true, x.ParseFromString(payload)); + EXPECT_EQ(i, x.value()); + EXPECT_EQ("protobuf", event.uri().scheme()); + EXPECT_EQ("my_ints", event.uri().path()); + EXPECT_EQ(getHostName(), event.uri().authority()); + EXPECT_EQ("type=google.protobuf.Int32Value", event.uri().query()); + } + EXPECT_THROW(reader.readNextEvent(), std::runtime_error); + } +} From d976d308d30b1f10e53c7a6ccde8f229f1fd79a0 Mon Sep 17 00:00:00 2001 From: Ethan Rublee Date: Mon, 10 Oct 2022 00:02:13 -0700 Subject: [PATCH 25/29] Write in c++ manually verified read in python. --- cpp/farm_ng/core/prototools/event_log_reader.cpp | 2 +- cpp/farm_ng/core/prototools/event_log_writer.cpp | 2 +- .../core/prototools/event_log_writer_test.cpp | 15 +++++++-------- py/farm_ng/core/events_file_reader.py | 14 ++++++++++---- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/cpp/farm_ng/core/prototools/event_log_reader.cpp b/cpp/farm_ng/core/prototools/event_log_reader.cpp index f991a87b..963d3d80 100644 --- a/cpp/farm_ng/core/prototools/event_log_reader.cpp +++ b/cpp/farm_ng/core/prototools/event_log_reader.cpp @@ -38,7 +38,7 @@ class EventLogReaderBinaryImpl : public EventLogReaderImpl { } uint64_t readEventSize() { - uint64_t n_bytes = 0; + uint32_t n_bytes = 0; in.read(reinterpret_cast(&n_bytes), sizeof(n_bytes)); if (!in) { throw std::runtime_error("Could not read packet length header"); diff --git a/cpp/farm_ng/core/prototools/event_log_writer.cpp b/cpp/farm_ng/core/prototools/event_log_writer.cpp index 45a04a77..c1dbedbd 100644 --- a/cpp/farm_ng/core/prototools/event_log_writer.cpp +++ b/cpp/farm_ng/core/prototools/event_log_writer.cpp @@ -73,7 +73,7 @@ class EventLogWriterBinaryImpl : public EventLogWriterImpl { event.set_payload_length(payload.size()); std::string event_str; event.SerializeToString(&event_str); - uint64_t n_bytes = event_str.size(); + uint32_t n_bytes = event_str.size(); out.write(reinterpret_cast(&n_bytes), sizeof(n_bytes)); out << event_str; out << payload; diff --git a/cpp/farm_ng/core/prototools/event_log_writer_test.cpp b/cpp/farm_ng/core/prototools/event_log_writer_test.cpp index 16b30a24..2a8f7fe1 100644 --- a/cpp/farm_ng/core/prototools/event_log_writer_test.cpp +++ b/cpp/farm_ng/core/prototools/event_log_writer_test.cpp @@ -12,7 +12,6 @@ #include #include -#include #include using namespace farm_ng; @@ -30,10 +29,10 @@ TEST(event_log, roundtrip) { auto file = log_dir / "events.log"; { EventLogWriter writer(file); - google::protobuf::Int32Value x; + core::proto::Timestamp x; for (int i = 0; i < 10; ++i) { - x.set_value(i); - writer.write("my_ints", x); + x.set_stamp(i); + writer.write("my_stamps", x); } } { @@ -41,13 +40,13 @@ TEST(event_log, roundtrip) { for (int i = 0; i < 10; ++i) { std::string payload; auto [event, pos] = reader.readNextEvent(&payload); - google::protobuf::Int32Value x; + core::proto::Timestamp x; EXPECT_EQ(true, x.ParseFromString(payload)); - EXPECT_EQ(i, x.value()); + EXPECT_EQ(i, x.stamp()); EXPECT_EQ("protobuf", event.uri().scheme()); - EXPECT_EQ("my_ints", event.uri().path()); + EXPECT_EQ("my_stamps", event.uri().path()); EXPECT_EQ(getHostName(), event.uri().authority()); - EXPECT_EQ("type=google.protobuf.Int32Value", event.uri().query()); + EXPECT_EQ("type=farm_ng.core.proto.Timestamp", event.uri().query()); } EXPECT_THROW(reader.readNextEvent(), std::runtime_error); } diff --git a/py/farm_ng/core/events_file_reader.py b/py/farm_ng/core/events_file_reader.py index 0c335868..8f491a7c 100644 --- a/py/farm_ng/core/events_file_reader.py +++ b/py/farm_ng/core/events_file_reader.py @@ -23,14 +23,20 @@ def parse_protobuf_descriptor(uri: Uri) -> Tuple[str, str]: type_split = uri.query.split("type=") assert len(type_split) == 2 desc = type_split[1].split("&")[0] # query string can have multiple key/value pairs - desc_list = desc.split(".") - name = desc_list[-1] - package = desc_list[:-2] + [name.lower() + "_pb2"] - return name, ".".join(package) + + if desc.startswith("farm_ng"): + desc_list = desc.split(".") + name = desc_list[-1] + package = desc_list[:-2] + [name.lower() + "_pb2"] + return name, ".".join(package) + else: + assert False, ("Unsupported protobuf type:", desc) class EventsFileReader: def __init__(self, file_name: Path) -> None: + if type(file_name) is str: + file_name = Path(file_name) self._file_name = file_name.absolute() assert Path(self._file_name.parents[0]).is_dir(), self._file_name assert self._file_name.exists(), self._file_name From 9e9b7c9adc6e70daa12c31c1784d5897b2bef2c8 Mon Sep 17 00:00:00 2001 From: Ethan Rublee Date: Mon, 10 Oct 2022 00:08:34 -0700 Subject: [PATCH 26/29] Get rid of filelength assert. --- py/tests/tests/test_core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/py/tests/tests/test_core.py b/py/tests/tests/test_core.py index 62664ca5..321dfa57 100644 --- a/py/tests/tests/test_core.py +++ b/py/tests/tests/test_core.py @@ -36,7 +36,6 @@ def test_write_images(self, log_file: Path) -> None: assert writer.open() writer.write("test/uri", message=get_monotonic_now(semantics="test/monotonic")) assert writer.close() - assert writer.file_length == 181, writer.file_length class TestEventsReader: From 4b3030f82b1f10d77cde63eeea5512a075fc107e Mon Sep 17 00:00:00 2001 From: edgar Date: Mon, 10 Oct 2022 15:28:03 +0200 Subject: [PATCH 27/29] add more tests and typings --- py/farm_ng/core/events_file_reader.py | 71 +++++++++++--------- py/farm_ng/core/events_file_writer.py | 24 +++++-- py/farm_ng/core/uri.py | 20 ++++-- py/setup.py | 4 +- py/tests/tests/test_core.py | 97 +++++++++++++++++++-------- py/tests/tests/test_stamp.py | 10 +++ py/tests/tests/test_uri.py | 20 ++++++ 7 files changed, 171 insertions(+), 75 deletions(-) create mode 100644 py/tests/tests/test_stamp.py create mode 100644 py/tests/tests/test_uri.py diff --git a/py/farm_ng/core/events_file_reader.py b/py/farm_ng/core/events_file_reader.py index 8f491a7c..0370dc53 100644 --- a/py/farm_ng/core/events_file_reader.py +++ b/py/farm_ng/core/events_file_reader.py @@ -1,14 +1,15 @@ import importlib from collections import defaultdict from pathlib import Path -from typing import Any, Tuple +from typing import Any +from typing import Tuple from typing import cast from typing import DefaultDict from typing import IO from typing import List from typing import Optional +from typing import Union import struct -import os from farm_ng.core.event_pb2 import Event from farm_ng.core.uri_pb2 import Uri from farm_ng.core.uri import uri_to_string, string_to_uri @@ -17,43 +18,43 @@ def parse_protobuf_descriptor(uri: Uri) -> Tuple[str, str]: assert uri.scheme == "protobuf" - # parse a proto descriptor and extract the message name and package. - # See: https://developers.google.com/protocol-buffers/docs/reference/python-generated#invocation - # NOTE: the descriptor comes in the shape of `farm_ng.core.proto.Timestamp` type_split = uri.query.split("type=") assert len(type_split) == 2 desc = type_split[1].split("&")[0] # query string can have multiple key/value pairs - if desc.startswith("farm_ng"): - desc_list = desc.split(".") - name = desc_list[-1] - package = desc_list[:-2] + [name.lower() + "_pb2"] - return name, ".".join(package) - else: - assert False, ("Unsupported protobuf type:", desc) + if not desc.startswith("farm_ng"): + raise Exception(f"Unsupported protobuf type: {desc}") + + # parse a proto descriptor and extract the message name and package. + # See: https://developers.google.com/protocol-buffers/docs/reference/python-generated#invocation + # NOTE: the descriptor comes in the shape of `farm_ng.core.proto.Timestamp` + desc_list = desc.split(".") + name = desc_list[-1] + package = desc_list[:-2] + [name.lower() + "_pb2"] + return name, ".".join(package) class EventsFileReader: - def __init__(self, file_name: Path) -> None: - if type(file_name) is str: + def __init__(self, file_name: Union[str, Path]) -> None: + if isinstance(file_name, str): file_name = Path(file_name) - self._file_name = file_name.absolute() - assert Path(self._file_name.parents[0]).is_dir(), self._file_name + self._file_name: Path = file_name.absolute() assert self._file_name.exists(), self._file_name + self._file_stream: Optional[IO] = None self._file_length: int = 0 - # assert self.open() # store the bytes offset in a dictionary where: # - the key is the string representation of the Uri of the message - # - the value is a list of offsets to messages with that Uri - self._event_index: DefaultDict[str, List[Tuple(Event, int)]] = defaultdict(list) + # - the value is a list with tuples of Event and its offsets to messages with that Uri + self._event_index: DefaultDict[str, List[Tuple[Event, int]]] = defaultdict(list) - def __enter__(self): - self.open() + def __enter__(self) -> "EventsFileReader": + assert self.open() return self - def __exit__(self, type, value, traceback): + # pylint: disable=redefined-builtin + def __exit__(self, type: Any, value: Any, traceback: Any) -> None: self.close() def __repr__(self) -> str: @@ -64,10 +65,12 @@ def __repr__(self) -> str: ) @property - def file_name(self) -> Optional[Path]: - if self._file_stream is None: - return None - return Path(self._file_stream.name) + def file_length(self) -> int: + return self._file_length + + @property + def file_name(self) -> Path: + return self._file_name def is_open(self) -> bool: return not self.is_closed() @@ -79,7 +82,7 @@ def is_closed(self) -> bool: def open(self) -> bool: self._file_stream = open(self._file_name, "rb") - self._file_length = os.path.getsize(self._file_name) + self._file_length = self._file_name.stat().st_size return self.is_open() def close(self) -> bool: @@ -104,19 +107,21 @@ def has_uri(self, uri: Uri) -> bool: return uri_to_string(uri) in self._event_index.keys() def _read_next_event(self) -> Event: - buffer = self._file_stream.read(4) + file_stream = cast(IO, self._file_stream) + buffer = file_stream.read(4) if len(buffer) != 4: raise EOFError() event_len = struct.unpack(" None: msg_bytes: int = event.payload_length - self._file_stream.seek(msg_bytes, 1) + file_stream = cast(IO, self._file_stream) + file_stream.seek(msg_bytes, 1) def _build_event_index(self) -> None: if not self.is_open(): @@ -156,15 +161,15 @@ def get_event(self, uri: Uri, frame_id: int) -> Tuple[Event, int]: return self.events(uri)[frame_id] def read_message(self, event: Event, offset: Optional[int] = None) -> Message: + file_stream = cast(IO, self._file_stream) if offset is not None: - self._file_stream.seek(offset, 0) + file_stream.seek(offset, 0) name, package = parse_protobuf_descriptor(event.uri) message_cls = getattr(importlib.import_module(package), name) - file_stream = cast(IO, self._file_stream) payload: bytes = file_stream.read(event.payload_length) - message = message_cls() + message: Message = message_cls() message.ParseFromString(payload) return message diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index e94795af..0a6d8a85 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -1,9 +1,11 @@ import struct from pathlib import Path +from typing import Any from typing import cast from typing import IO from typing import Optional from typing import List +from typing import Union from farm_ng.core.stamp import get_monotonic_now from farm_ng.core.uri import make_proto_uri from google.protobuf.message import Message @@ -16,21 +18,29 @@ class EventsFileWriter: - def __init__(self, file_name: Path) -> None: + def __init__(self, file_name: Union[str, Path]) -> None: + if isinstance(file_name, str): + file_name = Path(file_name) self._file_name: Path = file_name.absolute() assert Path(self._file_name.parents[0]).is_dir() + self._file_stream: Optional[IO] = None - self._file_length = 0 + self._file_length: int = 0 - def __enter__(self): - self.open() + def __enter__(self) -> "EventsFileWriter": + assert self.open() return self - def __exit__(self, type, value, traceback): + # pylint: disable=redefined-builtin + def __exit__(self, type: Any, value: Any, traceback: Any) -> None: self.close() def __repr__(self) -> str: - return f"file_name: {str(self._file_name)}\n" + f"is_open: {self.is_open}" + return ( + f"file_name: {str(self.file_name)} " + + f"file_stream: {self._file_stream} " + + f"is_open: {self.is_open()} " + ) @property def file_length(self) -> int: @@ -67,7 +77,7 @@ def make_write_stamp(self) -> Timestamp: def write_raw( self, uri: Uri, message: Message, timestamps: List[Timestamp] ) -> None: - assert self.is_open(), ("Event log is not open:", self.file_name) + assert self.is_open(), f"Event log is not open: {self.file_name}" file_stream = cast(IO, self._file_stream) payload = message.SerializeToString() event = Event( diff --git a/py/farm_ng/core/uri.py b/py/farm_ng/core/uri.py index 44d76bcd..86656abd 100644 --- a/py/farm_ng/core/uri.py +++ b/py/farm_ng/core/uri.py @@ -1,17 +1,27 @@ +from dataclasses import dataclass +import platform + from farm_ng.core import uri_pb2 from google.protobuf.message import Message -import platform -_g_host_name = platform.node() + +@dataclass +class PlatformConfig: + host_name: str = platform.node() + + +# https://vald-phoenix.github.io/pylint-errors/plerr/errors/variables/W0603.html +platform_config = PlatformConfig() def set_host_name(name: str) -> None: - global _g_host_name - _g_host_name = name + # global HOST_NAME + # HOST_NAME = name + platform_config.host_name = name def get_host_name() -> str: - return _g_host_name + return platform_config.host_name def get_authority() -> str: diff --git a/py/setup.py b/py/setup.py index 384493f8..dd6dd4f8 100644 --- a/py/setup.py +++ b/py/setup.py @@ -9,7 +9,7 @@ class BuildProtosCommand(Command): - user_options = [] + user_options = [] # type: ignore def initialize_options(self): pass @@ -38,7 +38,7 @@ def run(self): class CleanFilesCommand(Command): - user_options = [] + user_options = [] # type: ignore def initialize_options(self): pass diff --git a/py/tests/tests/test_core.py b/py/tests/tests/test_core.py index 321dfa57..6a6157c7 100644 --- a/py/tests/tests/test_core.py +++ b/py/tests/tests/test_core.py @@ -1,14 +1,15 @@ from pathlib import Path +from typing import List, Tuple import pytest -from farm_ng.core import timestamp_pb2 -from farm_ng.core.events_file_reader import EventsFileReader +from farm_ng.core import timestamp_pb2, uri_pb2, event_pb2 +from farm_ng.core.events_file_reader import EventsFileReader, parse_protobuf_descriptor from farm_ng.core.events_file_writer import EventsFileWriter from farm_ng.core.stamp import get_monotonic_now @pytest.fixture(name="log_file") -def fixture_writer(tmpdir) -> Path: +def fixture_log_file(tmpdir) -> Path: return Path(tmpdir) / "event.log" @@ -21,20 +22,25 @@ def test_smoke(self, log_file: Path) -> None: def test_open_close(self, log_file: Path) -> None: # open the file writer = EventsFileWriter(log_file) + assert "event.log" in str(writer.file_name) + assert writer.file_length == 0 + assert writer.close() + assert writer.is_closed() + assert not writer.is_open() + + # open object assert writer.open() assert not writer.is_closed() assert writer.is_open() - assert writer.file_name.name == "event.log" - # close the file assert writer.close() - assert writer.is_closed() - assert not writer.is_open() - assert writer.file_name == log_file - def test_write_images(self, log_file: Path) -> None: + def test_write_stamp(self, log_file: Path) -> None: writer = EventsFileWriter(log_file) assert writer.open() writer.write("test/uri", message=get_monotonic_now(semantics="test/monotonic")) + assert writer.file_length == 217 + writer.write("test/uri", message=get_monotonic_now(semantics="test/monotonic")) + assert writer.file_length == 434 assert writer.close() @@ -49,8 +55,6 @@ def test_smoke(self, log_file: Path) -> None: path="/leading/slash", message=get_monotonic_now(semantics="test/monotonic"), ) - # em - # empty object with EventsFileReader(log_file) as reader: assert reader.is_open() uris = reader.uris() @@ -59,14 +63,45 @@ def test_smoke(self, log_file: Path) -> None: assert uris[0].path == "/leading/slash" assert uris[1].path == "hello/world" + def test_open_close(self, log_file: Path) -> None: + with EventsFileWriter(log_file) as writer: + writer.write( + path="hello/world", + message=get_monotonic_now(semantics="test/monotonic"), + ) + # empty object + reader = EventsFileReader(log_file) + assert "event.log" in str(reader.file_name) + assert reader.file_length == 0 + assert reader.close() + assert reader.is_closed() + assert not reader.is_open() + assert not reader.has_uri(uri_pb2.Uri()) + assert reader.num_events(uri_pb2.Uri()) == 0 + + # open object + assert reader.open() + assert reader.file_length == 220 + assert not reader.is_closed() + assert reader.is_open() + assert reader.close() + + def test_parse_proto_descriptor(self): + uri = uri_pb2.Uri( + scheme="protobuf", path="tik/tok", query="type=farm_ng.core.proto.Timestamp" + ) + name, package = parse_protobuf_descriptor(uri) + assert name == "Timestamp" + assert package == "farm_ng.core.timestamp_pb2" + def test_write_read(self, log_file: Path) -> None: + num_events = 10 with EventsFileWriter(log_file) as writer: - for i in range(10): + for i in range(num_events): time_stamp = timestamp_pb2.Timestamp(stamp=i) writer.write(path="hello", message=time_stamp) writer.write(path="world", message=time_stamp) - print(writer.file_length) - # empty object + with EventsFileReader(log_file) as reader: assert reader.is_open() count = 0 @@ -77,9 +112,16 @@ def test_write_read(self, log_file: Path) -> None: assert message.stamp == count count += 1 - uris = reader.uris() + # test get/has uris + assert not reader._event_index + uris: List[uri_pb2.Uri] = reader.uris() + assert reader._event_index - # TODO This api is likely in flux + assert len(reader.uris()) == 2, uris + assert uris[0].path == "hello" + assert uris[1].path == "world" + + # TODO(edgar/ethan): This api is likely in flux # there are several ways we want to seek and # iterate over the reader # frame order, per uri @@ -88,18 +130,17 @@ def test_write_read(self, log_file: Path) -> None: # # Perhaps you can just get a list of all events # and filter it at the user level. - # Then: reader.read(offset, event) will seek to and read the given event - for event, offset in reader.events(uri=uris[0]): - print(offset, event.timestamps[-1].stamp) + # Then: reader.read_message(event, offset) will seek to and read the given event + for uri in uris: + assert reader.has_uri(uri) - assert len(reader.uris()) == 2, uris - assert uris[0].path == "hello" - assert uris[1].path == "world" - assert reader.num_events(uris[0]) == 10 - assert reader.num_events(uris[1]) == 10 - for frame_n in range(reader.num_events(uris[0])): - event, offset = reader.get_event(uri=uris[0], frame_id=frame_n) - message = reader.read_message(event, offset) - assert message.stamp == frame_n + # test get/has events + events: List[Tuple[event_pb2.Event, int]] = reader.events(uri) + assert len(events) == reader.num_events(uri) == num_events + + for frame_n in range(reader.num_events(uri)): + event, offset = reader.get_event(uri, frame_n) + message = reader.read_message(event, offset) + assert message.stamp == frame_n assert reader.close() diff --git a/py/tests/tests/test_stamp.py b/py/tests/tests/test_stamp.py new file mode 100644 index 00000000..cd7a7e33 --- /dev/null +++ b/py/tests/tests/test_stamp.py @@ -0,0 +1,10 @@ +from farm_ng.core import timestamp_pb2 +from farm_ng.core.stamp import get_monotonic_now + + +def test_monotonic(): + stamp = get_monotonic_now(semantics="test/monotonic") + assert isinstance(stamp, timestamp_pb2.Timestamp) + assert isinstance(stamp.stamp, float) + assert "monotonic" in stamp.clock_name + assert stamp.semantics == "test/monotonic" diff --git a/py/tests/tests/test_uri.py b/py/tests/tests/test_uri.py new file mode 100644 index 00000000..68e6bd79 --- /dev/null +++ b/py/tests/tests/test_uri.py @@ -0,0 +1,20 @@ +from farm_ng.core import uri_pb2 +from farm_ng.core import timestamp_pb2 +from farm_ng.core.uri import make_proto_uri, uri_to_string, string_to_uri + + +def test_make_proto(): + stamp = timestamp_pb2.Timestamp( + stamp=1.2, clock_name="clock0", semantics="test/proto" + ) + uri: uri_pb2.Uri = make_proto_uri("tik/tok", stamp) + assert uri.scheme == "protobuf" + assert uri.path == "tik/tok" + assert uri.query == "type=farm_ng.core.proto.Timestamp" + + uri_str: str = uri_to_string(uri) + assert "protobuf://" in uri_str + assert "/tik/tok?type=farm_ng.core.proto.Timestamp" in uri_str + + uri_out: uri_pb2.Uri = string_to_uri(uri_str) + assert uri == uri_out From 3e7f070a9cc9f420dcc0e7e4637f23f9f798b16c Mon Sep 17 00:00:00 2001 From: edgar Date: Mon, 10 Oct 2022 15:36:35 +0200 Subject: [PATCH 28/29] fix length test --- py/tests/tests/test_core.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/py/tests/tests/test_core.py b/py/tests/tests/test_core.py index 6a6157c7..54048f3d 100644 --- a/py/tests/tests/test_core.py +++ b/py/tests/tests/test_core.py @@ -38,9 +38,8 @@ def test_write_stamp(self, log_file: Path) -> None: writer = EventsFileWriter(log_file) assert writer.open() writer.write("test/uri", message=get_monotonic_now(semantics="test/monotonic")) - assert writer.file_length == 217 writer.write("test/uri", message=get_monotonic_now(semantics="test/monotonic")) - assert writer.file_length == 434 + assert writer.file_length > 0 assert writer.close() @@ -81,7 +80,7 @@ def test_open_close(self, log_file: Path) -> None: # open object assert reader.open() - assert reader.file_length == 220 + assert reader.file_length > 0 assert not reader.is_closed() assert reader.is_open() assert reader.close() From 9b94b079fb3f33cb161dc1a77af4b09b7c1aeefb Mon Sep 17 00:00:00 2001 From: edgar Date: Mon, 10 Oct 2022 15:38:44 +0200 Subject: [PATCH 29/29] enable mypy again --- .github/workflows/ci_python.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/ci_python.yml b/.github/workflows/ci_python.yml index fe52f7ed..e7c9134d 100644 --- a/.github/workflows/ci_python.yml +++ b/.github/workflows/ci_python.yml @@ -26,6 +26,4 @@ jobs: run: pip3 install -e .[dev] - name: Run Tests working-directory: py - # TODO: enable mypy later - # run: pytest -v tests/ --mypy - run: pytest -v tests/ + run: pytest -v tests/ --mypy