Skip to content

Commit

Permalink
Fix/columns to csv (#9)
Browse files Browse the repository at this point in the history
* Moves column from metadata to body

Max metadata size is 8kb and became a problem when the dataset has
lots of columns.

* Removes redundant parameter

encoding='utf-8' is the default value

* New metadata storage solution

MinIO/S3 object metadata has lots of problems:
max 8kb size, allows only Dict[str, str], ...

We had a problem saving a file with lots of columns (~1000) and
we couldn't store feature types as metadata since it exceeded the
8kb limit.

As a solution, now the columns are the 1st row in the csv file
and the metadata is in a separate file encoded in JSON format.

The unit test was modified to a large dataset 1000 cols, 1mi rows

* Removes compression, since it does not work yet on pandas

See: pandas-dev/pandas#22555

* Fix list_datasets issue

Two files are created for each dataset.

* Decreases mock_dataset size

Causing 502 bad gateway on play.min.io

* Improves code when reading saved metrics
  • Loading branch information
fberanizo authored Apr 17, 2020
1 parent 867bf1a commit 7eecb84
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 98 deletions.
96 changes: 46 additions & 50 deletions platiagro/datasets.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# -*- coding: utf-8 -*-
from io import BytesIO
from json import dumps, loads
from os import SEEK_SET
from os.path import join
from typing import List, Dict, Optional

import pandas as pd
from minio.error import NoSuchBucket, NoSuchKey

from .util import BUCKET_NAME, MINIO_CLIENT, make_bucket
from .util import BUCKET_NAME, MINIO_CLIENT, S3FS, make_bucket

PREFIX = "datasets"
FILE_EXTENSION = ".csv"
METADATA_EXTENSION = ".metadata"


def list_datasets() -> List[str]:
Expand All @@ -26,8 +27,9 @@ def list_datasets() -> List[str]:

objects = MINIO_CLIENT.list_objects_v2(BUCKET_NAME, PREFIX + "/")
for obj in objects:
name = obj.object_name[len(PREFIX) + 1:]
datasets.append(name)
if obj.object_name.endswith(FILE_EXTENSION):
name = obj.object_name[len(PREFIX) + 1:-len(FILE_EXTENSION)]
datasets.append(name)
return datasets


Expand All @@ -44,24 +46,14 @@ def load_dataset(name: str) -> pd.DataFrame:
FileNotFoundError: If dataset does not exist in the object storage.
"""
try:
object_name = join(PREFIX, name)

data = MINIO_CLIENT.get_object(
bucket_name=BUCKET_NAME,
object_name=object_name,
path = join("s3://", BUCKET_NAME, PREFIX, name + FILE_EXTENSION)
return pd.read_csv(
S3FS.open(path),
header=0,
index_col=False,
)
except (NoSuchBucket, NoSuchKey):
raise FileNotFoundError("No such file or directory: '{}'".format(name))

metadata = stat_dataset(name)
columns = metadata["columns"]

csv_buffer = BytesIO()
for d in data.stream(32*1024):
csv_buffer.write(d)
csv_buffer.seek(0, SEEK_SET)
df = pd.read_csv(csv_buffer, header=None, names=columns, index_col=False)
return df
except FileNotFoundError:
raise FileNotFoundError("The specified dataset does not exist")


def save_dataset(name: str,
Expand All @@ -74,36 +66,30 @@ def save_dataset(name: str,
df (pandas.DataFrame): the dataset as a `pandas.DataFrame`.
metadata (dict, optional): metadata about the dataset. Defaults to None.
"""
object_name = join(PREFIX, name)
# ensures MinIO bucket exists
make_bucket(BUCKET_NAME)

columns = df.columns.values.tolist()
# uploads file to MinIO
path = join(BUCKET_NAME, PREFIX, name + FILE_EXTENSION)
df.to_csv(
S3FS.open(path, "w"),
header=True,
index=False,
)

if metadata is None:
metadata = {}

# will store columns as metadata
metadata["columns"] = columns

# tries to encode metadata as json
# obs: MinIO requires the metadata to be a Dict[str, str]
for k, v in metadata.items():
metadata[str(k)] = dumps(v)

# converts DataFrame to bytes-like
csv_bytes = df.to_csv(header=False, index=False).encode("utf-8")
csv_buffer = BytesIO(csv_bytes)
file_length = len(csv_bytes)

# ensures MinIO bucket exists
make_bucket(BUCKET_NAME)
# encodes metadata to JSON format
buffer = BytesIO(dumps(metadata).encode())

# uploads file to MinIO
# uploads metadata to MinIO
object_name = join(PREFIX, name + METADATA_EXTENSION)
MINIO_CLIENT.put_object(
bucket_name=BUCKET_NAME,
object_name=object_name,
data=csv_buffer,
length=file_length,
metadata=metadata,
data=buffer,
length=buffer.getbuffer().nbytes,
)


Expand All @@ -119,19 +105,29 @@ def stat_dataset(name: str) -> Dict[str, str]:
Raises:
FileNotFoundError: If dataset does not exist in the object storage.
"""
metadata = {}
try:
object_name = join(PREFIX, name)
stat = MINIO_CLIENT.stat_object(
# reads the .metadata file
object_name = join(PREFIX, name + METADATA_EXTENSION)
data = MINIO_CLIENT.get_object(
bucket_name=BUCKET_NAME,
object_name=object_name,
)
# decodes the metadata (which is in JSON format)
metadata = loads(data.read())

# also reads the 1st line of data: column names
path = join("s3://", BUCKET_NAME, PREFIX, name + FILE_EXTENSION)
df = pd.read_csv(
S3FS.open(path),
header=0,
index_col=False,
nrows=1,
)
columns = df.columns.tolist()
metadata["columns"] = columns

metadata = {}
for k, v in stat.metadata.items():
if k.startswith("X-Amz-Meta-"):
key = k[len("X-Amz-Meta-"):].lower()
metadata[key] = loads(v)
except (NoSuchBucket, NoSuchKey):
raise FileNotFoundError("No such file or directory: '{}'".format(name))
raise FileNotFoundError("The specified dataset does not exist")

return metadata
2 changes: 1 addition & 1 deletion platiagro/figures.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def list_figures(experiment_id: str, operator_id: str) -> List[str]:
buffer = b""
for d in data.stream(32*1024):
buffer += d
encoded_figure = b64encode(buffer).decode("utf8")
encoded_figure = b64encode(buffer).decode()
figure = "data:image/png;base64,{}".format(encoded_figure)
figures.append(figure)
return figures
Expand Down
7 changes: 2 additions & 5 deletions platiagro/metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
from json import loads, dumps
from io import BytesIO
from json import loads, dumps
from os.path import join
from typing import Dict

Expand Down Expand Up @@ -42,10 +42,7 @@ def save_metrics(experiment_id: str, operator_id: str, reset: bool = False,
bucket_name=BUCKET_NAME,
object_name=object_name,
)
buffer = b""
for d in data.stream(32*1024):
buffer += d
encoded_metrics = loads(buffer.decode("utf-8"))
encoded_metrics = loads(data.read())
except NoSuchKey:
pass

Expand Down
10 changes: 10 additions & 0 deletions platiagro/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from minio import Minio
from minio.error import BucketAlreadyOwnedByYou
from s3fs.core import S3FileSystem

BUCKET_NAME = "anonymous"

Expand All @@ -14,6 +15,15 @@
secure=False,
)

S3FS = S3FileSystem(
key=getenv("MINIO_ACCESS_KEY", "minio"),
secret=getenv("MINIO_SECRET_KEY", "minio123"),
use_ssl=False,
client_kwargs={
"endpoint_url": "http://{}".format(getenv("MINIO_ENDPOINT", "minio-service.kubeflow:9000")),
}
)


def make_bucket(name):
"""Creates the bucket in MinIO. Ignores exception if bucket already exists.
Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ pandas==0.25.3
# on Python objects containing large data
joblib==0.14.1
# Makes statistical graphics
seaborn==0.10.0
seaborn==0.10.0
# Filesystem interface for S3
s3fs==0.4.2
90 changes: 49 additions & 41 deletions tests/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from minio.error import BucketAlreadyOwnedByYou
import pandas as pd

from platiagro import list_datasets, load_dataset, save_dataset, stat_dataset
from platiagro import list_datasets, load_dataset, save_dataset, stat_dataset, \
DATETIME, CATEGORICAL, NUMERICAL
from platiagro.util import BUCKET_NAME, MINIO_CLIENT


Expand All @@ -28,25 +29,39 @@ def make_bucket(self):
except BucketAlreadyOwnedByYou:
pass

def create_mock_dataset(self):
file = BytesIO(b"01/01/2000,5.1,3.5,1.4,0.2,Iris-setosa\n" +
b"01/01/2001,4.9,3.0,1.4,0.2,Iris-setosa\n" +
b"01/01/2002,4.7,3.2,1.3,0.2,Iris-setosa\n" +
b"01/01/2003,4.6,3.1,1.5,0.2,Iris-setosa")
columns = ["col0", "col1", "col2", "col3", "col4", "col5"]
featuretypes = ["DateTime", "Numerical", "Numerical",
"Numerical", "Numerical", "Categorical"]
def mock_columns(self, size=1e3):
return ["col{}".format(i) for i in range(int(size))]

def mock_values(self, size=1e3):
values = ["01/01/2000", 5.1, 3.5, 1.4, 0.2, "Iris-setosa"]
return [values[i % len(values)] for i in range(int(size))]

def mock_featuretypes(self, size=1e3):
ftypes = [DATETIME, NUMERICAL, NUMERICAL,
NUMERICAL, NUMERICAL, CATEGORICAL]
return [ftypes[i % len(ftypes)] for i in range(int(size))]

def create_mock_dataset(self, size=1e2):
header = ",".join(self.mock_columns()) + "\n"
rows = "\n".join([",".join([str(v) for v in self.mock_values()])
for x in range(int(size))])
buffer = BytesIO((header + rows).encode())
MINIO_CLIENT.put_object(
bucket_name=BUCKET_NAME,
object_name="datasets/mock.csv",
data=buffer,
length=buffer.getbuffer().nbytes,
)
metadata = {
"columns": dumps(columns),
"featuretypes": dumps(featuretypes),
"filename": dumps("iris.data"),
"featuretypes": self.mock_featuretypes(),
"filename": "mock.data",
}
buffer = BytesIO(dumps(metadata).encode())
MINIO_CLIENT.put_object(
bucket_name=BUCKET_NAME,
object_name="datasets/iris",
data=file,
length=file.getbuffer().nbytes,
metadata=metadata,
object_name="datasets/mock.metadata",
data=buffer,
length=buffer.getbuffer().nbytes,
)

def test_list_datasets(self):
Expand All @@ -57,47 +72,40 @@ def test_load_dataset(self):
with self.assertRaises(FileNotFoundError):
load_dataset("UNK")

expected = pd.DataFrame({
"col0": ["01/01/2000", "01/01/2001", "01/01/2002", "01/01/2003"],
"col1": [5.1, 4.9, 4.7, 4.6],
"col2": [3.5, 3.0, 3.2, 3.1],
"col3": [1.4, 1.4, 1.3, 1.5],
"col4": [0.2, 0.2, 0.2, 0.2],
"col5": ["Iris-setosa", "Iris-setosa", "Iris-setosa", "Iris-setosa"],
})
result = load_dataset("iris")
expected = pd.DataFrame(
data=[self.mock_values() for x in range(int(1e2))],
columns=self.mock_columns(),
)
result = load_dataset("mock")
self.assertTrue(result.equals(expected))

def test_save_dataset(self):
df = pd.DataFrame({"col0": []})
save_dataset("test", df)

df = pd.DataFrame({
"col0": ["2000-01-01", "2001-01-01", "2002-01-01", "2003-01-01"],
"col1": [5.1, 4.9, 4.7, 4.6],
"col2": [3.5, 3.0, 3.2, 3.1],
"col3": [1.4, 1.4, 1.3, 1.5],
"col4": [0.2, float('nan'), 0.2, 0.2],
"col5": ["Iris-setosa", "Iris-setosa", "Iris-setosa", "Iris-setosa"],
})
df = pd.DataFrame(
data=[self.mock_values() for x in range(int(1e2))],
columns=self.mock_columns(),
)
save_dataset("test", df)

df = pd.DataFrame({"col0": []})
df = pd.DataFrame(
data=[self.mock_values() for x in range(int(1e2))],
columns=self.mock_columns(),
)
save_dataset("test", df, metadata={
"filename": "test.data",
"featuretypes": ["DateTime", "Numerical", "Numerical",
"Numerical", "Numerical", "Categorical"],
"featuretypes": self.mock_featuretypes(),
})

def test_stat_dataset(self):
with self.assertRaises(FileNotFoundError):
stat_dataset("UNK")

expected = {
"columns": ["col0", "col1", "col2", "col3", "col4", "col5"],
"featuretypes": ["DateTime", "Numerical", "Numerical",
"Numerical", "Numerical", "Categorical"],
"filename": "iris.data",
"columns": self.mock_columns(),
"featuretypes": self.mock_featuretypes(),
"filename": "mock.data",
}
result = stat_dataset("iris")
result = stat_dataset("mock")
self.assertDictEqual(result, expected)

0 comments on commit 7eecb84

Please sign in to comment.