Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AERONET dates when parallel option used #125

Merged
merged 8 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions monetio/obs/aeronet.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,21 @@ def add_data(
interp_to_aod_values=interp_to_aod_values,
)

requested_parallel = n_procs > 1 or n_procs == -1
if has_joblib and requested_parallel:
# Split up by day
requested_parallel = n_procs != 1

# Split up by day
dates = pd.to_datetime(dates)
if dates is not None:
min_date = dates.min()
max_date = dates.max()
days = pd.date_range(start=min_date, end=max_date, freq="D") # TODO: subtract 1?
days1 = days + pd.Timedelta(days=1)
time_bounds = pd.date_range(start=min_date, end=max_date, freq="D")
if max_date not in time_bounds:
time_bounds = time_bounds.append(pd.DatetimeIndex([max_date]))

if has_joblib and requested_parallel and dates is not None and len(time_bounds) > 2:
dfs = Parallel(n_jobs=n_procs, verbose=verbose)(
delayed(_parallel_aeronet_call)(pd.DatetimeIndex([d1, d2]), **kwargs, freq=None)
for d1, d2 in zip(days, days1)
delayed(_parallel_aeronet_call)(pd.DatetimeIndex([t1, t2]), **kwargs, freq=None)
for t1, t2 in zip(time_bounds[:-1], time_bounds[1:])
)
df = pd.concat(dfs, ignore_index=True).drop_duplicates()
if freq is not None:
Expand Down Expand Up @@ -462,7 +467,7 @@ def add_data(
now = datetime.utcnow()
self.dates = pd.date_range(start=now.date(), end=now, freq="H")
else:
self.dates = dates
self.dates = pd.DatetimeIndex(dates)
if product is not None:
self.prod = product.upper()
else:
Expand Down
29 changes: 29 additions & 0 deletions tests/test_aeronet.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,32 @@ def test_interp_daily_with_pytspack():
df = aeronet.add_data(dates, daily=True, n_procs=1, interp_to_aod_values=standard_wavelengths)

assert {f"aod_{int(wl)}nm" for wl in standard_wavelengths}.issubset(df.columns)


@pytest.mark.parametrize(
"dates",
[
pd.to_datetime(["2019-09-01", "2019-09-02"]),
pd.to_datetime(["2019-09-01", "2019-09-03"]),
pd.to_datetime(["2019-09-01", "2019-09-01 12:00"]),
],
ids=[
"one day",
"two days",
"half day",
],
)
def test_issue100(dates, request):
df1 = aeronet.add_data(dates, n_procs=1)
df2 = aeronet.add_data(dates, n_procs=2)
assert len(df1) == len(df2)
if request.node.callspec.id == "two days":
# Sort first (can use `df1.compare(df2)` for debugging)
# Seems the sorting is site then time, not time then site
# which is why this is necessary
df1_ = df1.sort_values(["time", "siteid"]).reset_index(drop=True)
df2_ = df2.sort_values(["time", "siteid"]).reset_index(drop=True)
assert df1_.equals(df2_)
else:
assert df1.equals(df2)
assert dates[0] < df1.time.min() < df1.time.max() < dates[-1]
2 changes: 1 addition & 1 deletion tests/test_aqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


@pytest.mark.xfail(
not ssl_version < (2,), strict=True, reason="Doesn't work with newer OpenSSL", raises=SSLError
not ssl_version < (2,), strict=False, reason="Doesn't work with newer OpenSSL", raises=SSLError
)
def test_aqs():
# For MM data proc example
Expand Down