From 7eecb84fe52306b58ff055418be85bb2953d6d4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A1bio=20Beranizo=20Fontes=20Lopes?= Date: Thu, 16 Apr 2020 23:39:50 -0300 Subject: [PATCH] Fix/columns to csv (#9) * Moves column from metadata to body Max metadata size is 8kb and became a problem when the dataset has lots of columns. * Removes redundant parameter encoding='utf-8' is the default value * New metadata storage solution MinIO/S3 object metadata has lots of problems: max 8kb size, allows only Dict[str, str], ... We had a problem saving a file with lots of columns (~1000) and we couldn't store feature types as metadata since it exceeded the 8kb limit. As a solution, now the columns are the 1st row in the csv file and the metadata is in a separate file encoded in JSON format. The unit test was modified to a large dataset 1000 cols, 1mi rows * Removes compression, since it does not work yet on pandas See: https://github.com/pandas-dev/pandas/issues/22555 * Fix list_datasets issue Two files are created for each dataset. * Decreases mock_dataset size Causing 502 bad gateway on play.min.io * Improves code when reading saved metrics --- platiagro/datasets.py | 96 ++++++++++++++++++++---------------------- platiagro/figures.py | 2 +- platiagro/metrics.py | 7 +-- platiagro/util.py | 10 +++++ requirements.txt | 4 +- tests/test_datasets.py | 90 +++++++++++++++++++++------------------ 6 files changed, 111 insertions(+), 98 deletions(-) diff --git a/platiagro/datasets.py b/platiagro/datasets.py index 49bc7b4..573049a 100644 --- a/platiagro/datasets.py +++ b/platiagro/datasets.py @@ -1,16 +1,17 @@ # -*- coding: utf-8 -*- from io import BytesIO from json import dumps, loads -from os import SEEK_SET from os.path import join from typing import List, Dict, Optional import pandas as pd from minio.error import NoSuchBucket, NoSuchKey -from .util import BUCKET_NAME, MINIO_CLIENT, make_bucket +from .util import BUCKET_NAME, MINIO_CLIENT, S3FS, make_bucket PREFIX = "datasets" +FILE_EXTENSION = ".csv" +METADATA_EXTENSION = ".metadata" def list_datasets() -> List[str]: @@ -26,8 +27,9 @@ def list_datasets() -> List[str]: objects = MINIO_CLIENT.list_objects_v2(BUCKET_NAME, PREFIX + "/") for obj in objects: - name = obj.object_name[len(PREFIX) + 1:] - datasets.append(name) + if obj.object_name.endswith(FILE_EXTENSION): + name = obj.object_name[len(PREFIX) + 1:-len(FILE_EXTENSION)] + datasets.append(name) return datasets @@ -44,24 +46,14 @@ def load_dataset(name: str) -> pd.DataFrame: FileNotFoundError: If dataset does not exist in the object storage. """ try: - object_name = join(PREFIX, name) - - data = MINIO_CLIENT.get_object( - bucket_name=BUCKET_NAME, - object_name=object_name, + path = join("s3://", BUCKET_NAME, PREFIX, name + FILE_EXTENSION) + return pd.read_csv( + S3FS.open(path), + header=0, + index_col=False, ) - except (NoSuchBucket, NoSuchKey): - raise FileNotFoundError("No such file or directory: '{}'".format(name)) - - metadata = stat_dataset(name) - columns = metadata["columns"] - - csv_buffer = BytesIO() - for d in data.stream(32*1024): - csv_buffer.write(d) - csv_buffer.seek(0, SEEK_SET) - df = pd.read_csv(csv_buffer, header=None, names=columns, index_col=False) - return df + except FileNotFoundError: + raise FileNotFoundError("The specified dataset does not exist") def save_dataset(name: str, @@ -74,36 +66,30 @@ def save_dataset(name: str, df (pandas.DataFrame): the dataset as a `pandas.DataFrame`. metadata (dict, optional): metadata about the dataset. Defaults to None. """ - object_name = join(PREFIX, name) + # ensures MinIO bucket exists + make_bucket(BUCKET_NAME) - columns = df.columns.values.tolist() + # uploads file to MinIO + path = join(BUCKET_NAME, PREFIX, name + FILE_EXTENSION) + df.to_csv( + S3FS.open(path, "w"), + header=True, + index=False, + ) if metadata is None: metadata = {} - # will store columns as metadata - metadata["columns"] = columns - - # tries to encode metadata as json - # obs: MinIO requires the metadata to be a Dict[str, str] - for k, v in metadata.items(): - metadata[str(k)] = dumps(v) - - # converts DataFrame to bytes-like - csv_bytes = df.to_csv(header=False, index=False).encode("utf-8") - csv_buffer = BytesIO(csv_bytes) - file_length = len(csv_bytes) - - # ensures MinIO bucket exists - make_bucket(BUCKET_NAME) + # encodes metadata to JSON format + buffer = BytesIO(dumps(metadata).encode()) - # uploads file to MinIO + # uploads metadata to MinIO + object_name = join(PREFIX, name + METADATA_EXTENSION) MINIO_CLIENT.put_object( bucket_name=BUCKET_NAME, object_name=object_name, - data=csv_buffer, - length=file_length, - metadata=metadata, + data=buffer, + length=buffer.getbuffer().nbytes, ) @@ -119,19 +105,29 @@ def stat_dataset(name: str) -> Dict[str, str]: Raises: FileNotFoundError: If dataset does not exist in the object storage. """ + metadata = {} try: - object_name = join(PREFIX, name) - stat = MINIO_CLIENT.stat_object( + # reads the .metadata file + object_name = join(PREFIX, name + METADATA_EXTENSION) + data = MINIO_CLIENT.get_object( bucket_name=BUCKET_NAME, object_name=object_name, ) + # decodes the metadata (which is in JSON format) + metadata = loads(data.read()) + + # also reads the 1st line of data: column names + path = join("s3://", BUCKET_NAME, PREFIX, name + FILE_EXTENSION) + df = pd.read_csv( + S3FS.open(path), + header=0, + index_col=False, + nrows=1, + ) + columns = df.columns.tolist() + metadata["columns"] = columns - metadata = {} - for k, v in stat.metadata.items(): - if k.startswith("X-Amz-Meta-"): - key = k[len("X-Amz-Meta-"):].lower() - metadata[key] = loads(v) except (NoSuchBucket, NoSuchKey): - raise FileNotFoundError("No such file or directory: '{}'".format(name)) + raise FileNotFoundError("The specified dataset does not exist") return metadata diff --git a/platiagro/figures.py b/platiagro/figures.py index 0d0b655..dc9244f 100644 --- a/platiagro/figures.py +++ b/platiagro/figures.py @@ -38,7 +38,7 @@ def list_figures(experiment_id: str, operator_id: str) -> List[str]: buffer = b"" for d in data.stream(32*1024): buffer += d - encoded_figure = b64encode(buffer).decode("utf8") + encoded_figure = b64encode(buffer).decode() figure = "data:image/png;base64,{}".format(encoded_figure) figures.append(figure) return figures diff --git a/platiagro/metrics.py b/platiagro/metrics.py index 6db7714..42b655a 100644 --- a/platiagro/metrics.py +++ b/platiagro/metrics.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -from json import loads, dumps from io import BytesIO +from json import loads, dumps from os.path import join from typing import Dict @@ -42,10 +42,7 @@ def save_metrics(experiment_id: str, operator_id: str, reset: bool = False, bucket_name=BUCKET_NAME, object_name=object_name, ) - buffer = b"" - for d in data.stream(32*1024): - buffer += d - encoded_metrics = loads(buffer.decode("utf-8")) + encoded_metrics = loads(data.read()) except NoSuchKey: pass diff --git a/platiagro/util.py b/platiagro/util.py index 638a173..a71f36b 100644 --- a/platiagro/util.py +++ b/platiagro/util.py @@ -3,6 +3,7 @@ from minio import Minio from minio.error import BucketAlreadyOwnedByYou +from s3fs.core import S3FileSystem BUCKET_NAME = "anonymous" @@ -14,6 +15,15 @@ secure=False, ) +S3FS = S3FileSystem( + key=getenv("MINIO_ACCESS_KEY", "minio"), + secret=getenv("MINIO_SECRET_KEY", "minio123"), + use_ssl=False, + client_kwargs={ + "endpoint_url": "http://{}".format(getenv("MINIO_ENDPOINT", "minio-service.kubeflow:9000")), + } +) + def make_bucket(name): """Creates the bucket in MinIO. Ignores exception if bucket already exists. diff --git a/requirements.txt b/requirements.txt index 984293e..b5aaa10 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,6 @@ pandas==0.25.3 # on Python objects containing large data joblib==0.14.1 # Makes statistical graphics -seaborn==0.10.0 \ No newline at end of file +seaborn==0.10.0 +# Filesystem interface for S3 +s3fs==0.4.2 \ No newline at end of file diff --git a/tests/test_datasets.py b/tests/test_datasets.py index c96443b..436eb71 100644 --- a/tests/test_datasets.py +++ b/tests/test_datasets.py @@ -6,7 +6,8 @@ from minio.error import BucketAlreadyOwnedByYou import pandas as pd -from platiagro import list_datasets, load_dataset, save_dataset, stat_dataset +from platiagro import list_datasets, load_dataset, save_dataset, stat_dataset, \ + DATETIME, CATEGORICAL, NUMERICAL from platiagro.util import BUCKET_NAME, MINIO_CLIENT @@ -28,25 +29,39 @@ def make_bucket(self): except BucketAlreadyOwnedByYou: pass - def create_mock_dataset(self): - file = BytesIO(b"01/01/2000,5.1,3.5,1.4,0.2,Iris-setosa\n" + - b"01/01/2001,4.9,3.0,1.4,0.2,Iris-setosa\n" + - b"01/01/2002,4.7,3.2,1.3,0.2,Iris-setosa\n" + - b"01/01/2003,4.6,3.1,1.5,0.2,Iris-setosa") - columns = ["col0", "col1", "col2", "col3", "col4", "col5"] - featuretypes = ["DateTime", "Numerical", "Numerical", - "Numerical", "Numerical", "Categorical"] + def mock_columns(self, size=1e3): + return ["col{}".format(i) for i in range(int(size))] + + def mock_values(self, size=1e3): + values = ["01/01/2000", 5.1, 3.5, 1.4, 0.2, "Iris-setosa"] + return [values[i % len(values)] for i in range(int(size))] + + def mock_featuretypes(self, size=1e3): + ftypes = [DATETIME, NUMERICAL, NUMERICAL, + NUMERICAL, NUMERICAL, CATEGORICAL] + return [ftypes[i % len(ftypes)] for i in range(int(size))] + + def create_mock_dataset(self, size=1e2): + header = ",".join(self.mock_columns()) + "\n" + rows = "\n".join([",".join([str(v) for v in self.mock_values()]) + for x in range(int(size))]) + buffer = BytesIO((header + rows).encode()) + MINIO_CLIENT.put_object( + bucket_name=BUCKET_NAME, + object_name="datasets/mock.csv", + data=buffer, + length=buffer.getbuffer().nbytes, + ) metadata = { - "columns": dumps(columns), - "featuretypes": dumps(featuretypes), - "filename": dumps("iris.data"), + "featuretypes": self.mock_featuretypes(), + "filename": "mock.data", } + buffer = BytesIO(dumps(metadata).encode()) MINIO_CLIENT.put_object( bucket_name=BUCKET_NAME, - object_name="datasets/iris", - data=file, - length=file.getbuffer().nbytes, - metadata=metadata, + object_name="datasets/mock.metadata", + data=buffer, + length=buffer.getbuffer().nbytes, ) def test_list_datasets(self): @@ -57,36 +72,30 @@ def test_load_dataset(self): with self.assertRaises(FileNotFoundError): load_dataset("UNK") - expected = pd.DataFrame({ - "col0": ["01/01/2000", "01/01/2001", "01/01/2002", "01/01/2003"], - "col1": [5.1, 4.9, 4.7, 4.6], - "col2": [3.5, 3.0, 3.2, 3.1], - "col3": [1.4, 1.4, 1.3, 1.5], - "col4": [0.2, 0.2, 0.2, 0.2], - "col5": ["Iris-setosa", "Iris-setosa", "Iris-setosa", "Iris-setosa"], - }) - result = load_dataset("iris") + expected = pd.DataFrame( + data=[self.mock_values() for x in range(int(1e2))], + columns=self.mock_columns(), + ) + result = load_dataset("mock") self.assertTrue(result.equals(expected)) def test_save_dataset(self): df = pd.DataFrame({"col0": []}) save_dataset("test", df) - df = pd.DataFrame({ - "col0": ["2000-01-01", "2001-01-01", "2002-01-01", "2003-01-01"], - "col1": [5.1, 4.9, 4.7, 4.6], - "col2": [3.5, 3.0, 3.2, 3.1], - "col3": [1.4, 1.4, 1.3, 1.5], - "col4": [0.2, float('nan'), 0.2, 0.2], - "col5": ["Iris-setosa", "Iris-setosa", "Iris-setosa", "Iris-setosa"], - }) + df = pd.DataFrame( + data=[self.mock_values() for x in range(int(1e2))], + columns=self.mock_columns(), + ) save_dataset("test", df) - df = pd.DataFrame({"col0": []}) + df = pd.DataFrame( + data=[self.mock_values() for x in range(int(1e2))], + columns=self.mock_columns(), + ) save_dataset("test", df, metadata={ "filename": "test.data", - "featuretypes": ["DateTime", "Numerical", "Numerical", - "Numerical", "Numerical", "Categorical"], + "featuretypes": self.mock_featuretypes(), }) def test_stat_dataset(self): @@ -94,10 +103,9 @@ def test_stat_dataset(self): stat_dataset("UNK") expected = { - "columns": ["col0", "col1", "col2", "col3", "col4", "col5"], - "featuretypes": ["DateTime", "Numerical", "Numerical", - "Numerical", "Numerical", "Categorical"], - "filename": "iris.data", + "columns": self.mock_columns(), + "featuretypes": self.mock_featuretypes(), + "filename": "mock.data", } - result = stat_dataset("iris") + result = stat_dataset("mock") self.assertDictEqual(result, expected)