Skip to content

Slightly better Typechecking when exporting to SQL #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyerrors/input/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,6 @@ def import_json_string(json_string, verbose=True, full_output=False):
result : dict
if full_output=True
"""

return _parse_json_dict(json.loads(json_string), verbose, full_output)


Expand Down
52 changes: 36 additions & 16 deletions pyerrors/input/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ..obs import Obs
from ..correlators import Corr
from .json import create_json_string, import_json_string
import numpy as np


def to_sql(df, table_name, db, if_exists='fail', gz=True, **kwargs):
Expand Down Expand Up @@ -76,6 +77,13 @@ def dump_df(df, fname, gz=True):
-------
None
"""
for column in df:
serialize = _need_to_serialize(df[column])
if not serialize:
if all(isinstance(entry, (int, np.integer, float, np.floating)) for entry in df[column]):
if any([np.isnan(entry) for entry in df[column]]):
warnings.warn("nan value in column " + column + " will be replaced by None", UserWarning)

out = _serialize_df(df, gz=False)

if not fname.endswith('.csv'):
Expand Down Expand Up @@ -114,11 +122,11 @@ def load_df(fname, auto_gamma=False, gz=True):
if not fname.endswith('.gz'):
fname += '.gz'
with gzip.open(fname) as f:
re_import = pd.read_csv(f)
re_import = pd.read_csv(f, keep_default_na=False)
else:
if fname.endswith('.gz'):
warnings.warn("Trying to read from %s without unzipping!" % fname, UserWarning)
re_import = pd.read_csv(fname)
re_import = pd.read_csv(fname, keep_default_na=False)

return _deserialize_df(re_import, auto_gamma=auto_gamma)

Expand All @@ -135,17 +143,12 @@ def _serialize_df(df, gz=False):
"""
out = df.copy()
for column in out:
serialize = False
if isinstance(out[column][0], (Obs, Corr)):
serialize = True
elif isinstance(out[column][0], list):
if all(isinstance(o, Obs) for o in out[column][0]):
serialize = True
serialize = _need_to_serialize(out[column])

if serialize is True:
out[column] = out[column].transform(lambda x: create_json_string(x, indent=0))
out[column] = out[column].transform(lambda x: create_json_string(x, indent=0) if x is not None else None)
if gz is True:
out[column] = out[column].transform(lambda x: gzip.compress(x.encode('utf-8')))
out[column] = out[column].transform(lambda x: gzip.compress((x if x is not None else '').encode('utf-8')))
return out


Expand All @@ -168,12 +171,29 @@ def _deserialize_df(df, auto_gamma=False):
if isinstance(df[column][0], bytes):
if df[column][0].startswith(b"\x1f\x8b\x08\x00"):
df[column] = df[column].transform(lambda x: gzip.decompress(x).decode('utf-8'))
if isinstance(df[column][0], str):
if '"program":' in df[column][0][:20]:
df[column] = df[column].transform(lambda x: import_json_string(x, verbose=False))
df = df.replace({r'^$': None}, regex=True)
i = 0
while df[column][i] is None:
i += 1
if isinstance(df[column][i], str):
if '"program":' in df[column][i][:20]:
df[column] = df[column].transform(lambda x: import_json_string(x, verbose=False) if x is not None else None)
if auto_gamma is True:
if isinstance(df[column][0], list):
df[column].apply(lambda x: [o.gm() for o in x])
if isinstance(df[column][i], list):
df[column].apply(lambda x: [o.gm() if o is not None else x for o in x])
else:
df[column].apply(lambda x: x.gamma_method())
df[column].apply(lambda x: x.gm() if x is not None else x)
return df


def _need_to_serialize(col):
serialize = False
i = 0
while col[i] is None:
i += 1
if isinstance(col[i], (Obs, Corr)):
serialize = True
elif isinstance(col[i], list):
if all(isinstance(o, Obs) for o in col[i]):
serialize = True
return serialize
169 changes: 162 additions & 7 deletions tests/pandas_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import pandas as pd
import pyerrors as pe
import pytest
import warnings


def test_df_export_import(tmp_path):
my_dict = {"int": 1,
"float": -0.01,
"Obs1": pe.pseudo_Obs(87, 21, "test_ensemble"),
"Obs2": pe.pseudo_Obs(-87, 21, "test_ensemble2")}
"float": -0.01,
"Obs1": pe.pseudo_Obs(87, 21, "test_ensemble"),
"Obs2": pe.pseudo_Obs(-87, 21, "test_ensemble2")}
for gz in [True, False]:
my_df = pd.DataFrame([my_dict] * 10)

Expand All @@ -18,13 +20,166 @@ def test_df_export_import(tmp_path):
pe.input.pandas.load_df((tmp_path / 'df_output.csv').as_posix(), gz=gz)


def test_null_first_line_df_export_import(tmp_path):
my_dict = {"int": 1,
"float": -0.01,
"Obs1": pe.pseudo_Obs(87, 21, "test_ensemble"),
"Obs2": pe.pseudo_Obs(-87, 21, "test_ensemble2")}
my_df = pd.DataFrame([my_dict] * 4)
my_df.loc[0, "Obs1"] = None
my_df.loc[2, "Obs1"] = None
for gz in [True, False]:
pe.input.pandas.dump_df(my_df, (tmp_path / 'df_output').as_posix(), gz=gz)
reconstructed_df = pe.input.pandas.load_df((tmp_path / 'df_output').as_posix(), auto_gamma=True, gz=gz)
assert reconstructed_df.loc[0, "Obs1"] is None
assert reconstructed_df.loc[2, "Obs1"] is None
assert np.all(reconstructed_df.loc[1] == my_df.loc[1])
assert np.all(reconstructed_df.loc[3] == my_df.loc[3])


def test_nan_df_export_import(tmp_path):
my_dict = {"int": 1,
"float": -0.01,
"Obs1": pe.pseudo_Obs(87, 21, "test_ensemble"),
"Obs2": pe.pseudo_Obs(-87, 21, "test_ensemble2")}
my_df = pd.DataFrame([my_dict] * 4)
my_df.loc[1, "int"] = np.nan

for gz in [True, False]:
pe.input.pandas.dump_df(my_df, (tmp_path / 'df_output').as_posix(), gz=gz)
reconstructed_df = pe.input.pandas.load_df((tmp_path / 'df_output').as_posix(), auto_gamma=True, gz=gz)
with pytest.warns(UserWarning, match="nan value in column int will be replaced by None"):
warnings.warn("nan value in column int will be replaced by None", UserWarning)
assert reconstructed_df.loc[1, "int"] is None
assert np.all(reconstructed_df.loc[:, "float"] == my_df.loc[:, "float"])
assert np.all(reconstructed_df.loc[:, "Obs1"] == my_df.loc[:, "Obs1"])
assert np.all(reconstructed_df.loc[:, "Obs2"] == my_df.loc[:, "Obs2"])


def test_null_second_line_df_export_import(tmp_path):
my_dict = {"int": 1,
"float": -0.01,
"Obs1": pe.pseudo_Obs(87, 21, "test_ensemble"),
"Obs2": pe.pseudo_Obs(-87, 21, "test_ensemble2")}
my_df = pd.DataFrame([my_dict] * 4)
my_df.loc[1, "Obs1"] = None
for gz in [True, False]:
pe.input.pandas.dump_df(my_df, (tmp_path / 'df_output').as_posix(), gz=gz)
reconstructed_df = pe.input.pandas.load_df((tmp_path / 'df_output').as_posix(), auto_gamma=True, gz=gz)
assert reconstructed_df.loc[1, "Obs1"] is None
assert np.all(reconstructed_df.loc[0] == my_df.loc[0])
assert np.all(reconstructed_df.loc[2:] == my_df.loc[2:])


def test_null_first_line_df_gzsql_export_import(tmp_path):
my_dict = {"int": 1,
"float": -0.01,
"Obs1": pe.pseudo_Obs(87, 21, "test_ensemble"),
"Obs2": pe.pseudo_Obs(-87, 21, "test_ensemble2")}

my_df = pd.DataFrame([my_dict] * 4)
my_df.loc[0, "Obs1"] = None
my_df.loc[2, "Obs1"] = None
gz = True
pe.input.pandas.to_sql(my_df, 'test', (tmp_path / 'test.db').as_posix(), gz=gz)
reconstructed_df = pe.input.pandas.read_sql('SELECT * FROM test', (tmp_path / 'test.db').as_posix(), auto_gamma=True)
assert reconstructed_df.loc[0, "Obs1"] is None
assert reconstructed_df.loc[2, "Obs1"] is None
assert np.all(reconstructed_df.loc[1] == my_df.loc[1])
assert np.all(reconstructed_df.loc[3] == my_df.loc[3])


def test_null_second_line_df_gzsql_export_import(tmp_path):
my_dict = {"int": 1,
"float": -0.01,
"Obs1": pe.pseudo_Obs(87, 21, "test_ensemble"),
"Obs2": pe.pseudo_Obs(-87, 21, "test_ensemble2")}

my_df = pd.DataFrame([my_dict] * 4)
my_df.loc[1, "Obs1"] = None
gz = True
pe.input.pandas.to_sql(my_df, 'test', (tmp_path / 'test.db').as_posix(), gz=gz)
reconstructed_df = pe.input.pandas.read_sql('SELECT * FROM test', (tmp_path / 'test.db').as_posix(), auto_gamma=True)
assert reconstructed_df.loc[1, "Obs1"] is None
assert np.all(reconstructed_df.loc[0] == my_df.loc[0])
assert np.all(reconstructed_df.loc[2:] == my_df.loc[2:])


def test_null_first_line_df_sql_export_import(tmp_path):
my_dict = {"int": 1,
"float": -0.01,
"Obs1": pe.pseudo_Obs(87, 21, "test_ensemble"),
"Obs2": pe.pseudo_Obs(-87, 21, "test_ensemble2")}

my_df = pd.DataFrame([my_dict] * 4)
my_df.loc[0, "Obs1"] = None
my_df.loc[2, "Obs1"] = None
gz = False
pe.input.pandas.to_sql(my_df, 'test', (tmp_path / 'test.db').as_posix(), gz=gz)
reconstructed_df = pe.input.pandas.read_sql('SELECT * FROM test', (tmp_path / 'test.db').as_posix(), auto_gamma=True)
assert reconstructed_df.loc[0, "Obs1"] is None
assert reconstructed_df.loc[2, "Obs1"] is None
assert np.all(reconstructed_df.loc[1] == my_df.loc[1])
assert np.all(reconstructed_df.loc[3] == my_df.loc[3])


def test_nan_sql_export_import(tmp_path):
my_dict = {"int": 1,
"float": -0.01,
"Obs1": pe.pseudo_Obs(87, 21, "test_ensemble"),
"Obs2": pe.pseudo_Obs(-87, 21, "test_ensemble2")}
my_df = pd.DataFrame([my_dict] * 4)
my_df.loc[1, "int"] = np.nan
gz = False
pe.input.pandas.to_sql(my_df, 'test', (tmp_path / 'test.db').as_posix(), gz=gz)
reconstructed_df = pe.input.pandas.read_sql('SELECT * FROM test', (tmp_path / 'test.db').as_posix(), auto_gamma=True)
with pytest.warns(UserWarning, match="nan value in column int will be replaced by None"):
warnings.warn("nan value in column int will be replaced by None", UserWarning)
assert np.isnan(reconstructed_df.loc[1, "int"])
assert np.all(reconstructed_df.loc[:, "float"] == my_df.loc[:, "float"])
assert np.all(reconstructed_df.loc[:, "Obs1"] == my_df.loc[:, "Obs1"])
assert np.all(reconstructed_df.loc[:, "Obs2"] == my_df.loc[:, "Obs2"])


def test_nan_gzsql_export_import(tmp_path):
my_dict = {"int": 1,
"float": -0.01,
"Obs1": pe.pseudo_Obs(87, 21, "test_ensemble"),
"Obs2": pe.pseudo_Obs(-87, 21, "test_ensemble2")}
my_df = pd.DataFrame([my_dict] * 4)
my_df.loc[1, "int"] = np.nan
gz = True
pe.input.pandas.to_sql(my_df, 'test', (tmp_path / 'test.db').as_posix(), gz=gz)
reconstructed_df = pe.input.pandas.read_sql('SELECT * FROM test', (tmp_path / 'test.db').as_posix(), auto_gamma=True)
assert np.isnan(reconstructed_df.loc[1, "int"])
assert np.all(reconstructed_df.loc[:, "float"] == my_df.loc[:, "float"])
assert np.all(reconstructed_df.loc[:, "Obs1"] == my_df.loc[:, "Obs1"])
assert np.all(reconstructed_df.loc[:, "Obs2"] == my_df.loc[:, "Obs2"])


def test_null_second_line_df_sql_export_import(tmp_path):
my_dict = {"int": 1,
"float": -0.01,
"Obs1": pe.pseudo_Obs(87, 21, "test_ensemble"),
"Obs2": pe.pseudo_Obs(-87, 21, "test_ensemble2")}

my_df = pd.DataFrame([my_dict] * 4)
my_df.loc[1, "Obs1"] = None
gz = False
pe.input.pandas.to_sql(my_df, 'test', (tmp_path / 'test.db').as_posix(), gz=gz)
reconstructed_df = pe.input.pandas.read_sql('SELECT * FROM test', (tmp_path / 'test.db').as_posix(), auto_gamma=True)
assert reconstructed_df.loc[1, "Obs1"] is None
assert np.all(reconstructed_df.loc[0] == my_df.loc[0])
assert np.all(reconstructed_df.loc[2:] == my_df.loc[2:])


def test_df_Corr(tmp_path):

my_corr = pe.Corr([pe.pseudo_Obs(-0.48, 0.04, "test"), pe.pseudo_Obs(-0.154, 0.03, "test")])

my_dict = {"int": 1,
"float": -0.01,
"Corr": my_corr}
"float": -0.01,
"Corr": my_corr}
my_df = pd.DataFrame([my_dict] * 5)

pe.input.pandas.dump_df(my_df, (tmp_path / 'df_output').as_posix())
Expand Down Expand Up @@ -76,8 +231,8 @@ def test_sql_if_exists_fail(tmp_path):

def test_Obs_list_sql(tmp_path):
my_dict = {"int": 1,
"Obs1": pe.pseudo_Obs(17, 11, "test_sql_if_exists_failnsemble"),
"Obs_list": [[pe.pseudo_Obs(0.0, 0.1, "test_ensemble2"), pe.pseudo_Obs(3.2, 1.1, "test_ensemble2")]]}
"Obs1": pe.pseudo_Obs(17, 11, "test_sql_if_exists_failnsemble"),
"Obs_list": [[pe.pseudo_Obs(0.0, 0.1, "test_ensemble2"), pe.pseudo_Obs(3.2, 1.1, "test_ensemble2")]]}
pe_df = pd.DataFrame(my_dict)
my_db = (tmp_path / "test_db.sqlite").as_posix()
pe.input.pandas.to_sql(pe_df, "My_table", my_db)
Expand Down