Skip to content

Commit

Permalink
Add tests for more complex sorting scenarios. (#167)
Browse files Browse the repository at this point in the history
* Add tests for more complex sorting scenarios.

* Spacing.

* Add dataframe comparisons.
  • Loading branch information
delucchi-cmu authored Nov 21, 2023
1 parent 9ed8952 commit c31fe68
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 27 deletions.
2 changes: 1 addition & 1 deletion tests/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ indent-string=' '
max-line-length=110

# Maximum number of lines in a module.
max-module-lines=500
max-module-lines=1000

# Allow the body of a class to be on the same line as the declaration if body
# contains single statement.
Expand Down
173 changes: 152 additions & 21 deletions tests/hipscat_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Tests of map reduce operations"""

import os
from io import StringIO

import healpy as hp
import hipscat.pixel_math as hist
import numpy as np
import numpy.testing as npt
Expand Down Expand Up @@ -315,27 +317,6 @@ def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_p

expected_ids = [*range(700, 831)]
assert_parquet_file_ids(output_file, "id", expected_ids)
# expected_indexes = [
# 13598131468743213056,
# 13560933976658411520,
# 13561582046530240512,
# 13696722494273093632,
# 13588709332114997248,
# 13552942781667737600,
# 13601023174257934336,
# 13557123557418336256,
# 13591216801265483776,
# 13565852277582856192,
# 13553697461939208192,
# 13563711661973438464,
# 13590818251897569280,
# 13560168899495854080,
# 13557816572940124160,
# 13596001812279721984,
# 13564690156971098112,
# 13557377060258709504,
# ]
# assert_parquet_file_index(output_file, expected_indexes)
data_frame = pd.read_parquet(output_file, engine="pyarrow")
assert data_frame.index.name == "_hipscat_index"
npt.assert_array_equal(
Expand Down Expand Up @@ -385,3 +366,153 @@ def test_reduce_bad_expectation(parquet_shards_dir, tmp_path):
id_column="id",
delete_input_files=False,
)


def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
"""Test reducing and requesting specific sort columns.
Logically, the input data has a mix of orderings in files, object IDs, and timestamps.
Each source is partitioned according to the linked object's radec, and so will be
ordered within the same hipscat_index value.
First, we take some time to set up these silly data points, then we test out
reducing them into a single parquet file using a mix of reduction options.
"""
os.makedirs(os.path.join(tmp_path, "reducing"))
shard_dir = os.path.join(tmp_path, "reduce_shards", "order_0", "dir_0", "pixel_11")
os.makedirs(shard_dir)
output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet")

file1_string = """source_id,object_id,time,ra,dec
1200,700,3000,282.5,-58.5
1201,700,4000,282.5,-58.5
1402,702,3000,310.5,-27.5
1403,702,3100,310.5,-27.5
1404,702,3200,310.5,-27.5
1505,703,4000,286.5,-69.5"""
file1_data = pd.read_csv(StringIO(file1_string))
file1_data.to_parquet(os.path.join(shard_dir, "file_1_shard_1.parquet"))

file2_string = """source_id,object_id,time,ra,dec
1206,700,2000,282.5,-58.5
1307,701,2200,299.5,-48.5
1308,701,2100,299.5,-48.5
1309,701,2000,299.5,-48.5"""
file2_data = pd.read_csv(StringIO(file2_string))
file2_data.to_parquet(os.path.join(shard_dir, "file_2_shard_1.parquet"))

combined_data = pd.concat([file1_data, file2_data])
combined_data["norder19_healpix"] = hp.ang2pix(
2 ** 19,
combined_data["ra"].values,
combined_data["dec"].values,
lonlat=True,
nest=True,
)
## Use this to prune generated columns like Norder, Npix, and _hipscat_index
comparison_columns = ["source_id", "object_id", "time", "ra", "dec"]

######################## Sort option 1: by source_id
## This will sort WITHIN an order 19 healpix pixel. In that ordering, the objects are
## (703, 700, 701, 702)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
destination_pixel_number=11,
destination_pixel_size=10,
output_path=tmp_path,
ra_column="ra",
dec_column="dec",
id_column="source_id",
delete_input_files=False,
)

## sort order is effectively (norder19 healpix, source_id)
data_frame = pd.read_parquet(output_file, engine="pyarrow")
expected_dataframe = combined_data.sort_values(["norder19_healpix", "source_id"])
pd.testing.assert_frame_equal(
expected_dataframe[comparison_columns].reset_index(drop=True),
data_frame[comparison_columns].reset_index(drop=True),
)
assert_parquet_file_ids(
output_file,
"source_id",
[1505, 1200, 1201, 1206, 1307, 1308, 1309, 1402, 1403, 1404],
resort_ids=False,
)

assert_parquet_file_ids(
output_file,
"object_id",
[703, 700, 700, 700, 701, 701, 701, 702, 702, 702],
resort_ids=False,
)

######################## Sort option 2: by object id and time
## sort order is effectively (norder19 healpix, object id, time)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
destination_pixel_number=11,
destination_pixel_size=10,
output_path=tmp_path,
ra_column="ra",
dec_column="dec",
id_column=["object_id", "time"],
delete_input_files=False,
)

data_frame = pd.read_parquet(output_file, engine="pyarrow")
expected_dataframe = combined_data.sort_values(["norder19_healpix", "object_id", "time"])
pd.testing.assert_frame_equal(
expected_dataframe[comparison_columns].reset_index(drop=True),
data_frame[comparison_columns].reset_index(drop=True),
)
assert_parquet_file_ids(
output_file,
"source_id",
[1505, 1206, 1200, 1201, 1309, 1308, 1307, 1402, 1403, 1404],
resort_ids=False,
)
assert_parquet_file_ids(
output_file,
"time",
[4000, 2000, 3000, 4000, 2000, 2100, 2200, 3000, 3100, 3200],
resort_ids=False,
)

######################## Sort option 3: by object id and time WITHOUT hipscat index.
## The 1500 block of ids goes back to the end, because we're not using
## spatial properties for sorting, only numeric.
## sort order is effectively (object id, time)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
destination_pixel_number=11,
destination_pixel_size=10,
output_path=tmp_path,
ra_column="ra",
dec_column="dec",
id_column=["object_id", "time"],
add_hipscat_index=False,
delete_input_files=False,
)

data_frame = pd.read_parquet(output_file, engine="pyarrow")
expected_dataframe = combined_data.sort_values(["object_id", "time"])
pd.testing.assert_frame_equal(
expected_dataframe[comparison_columns].reset_index(drop=True),
data_frame[comparison_columns].reset_index(drop=True),
)
assert_parquet_file_ids(
output_file,
"source_id",
[1206, 1200, 1201, 1309, 1308, 1307, 1402, 1403, 1404, 1505],
resort_ids=False,
)
11 changes: 6 additions & 5 deletions tests/hipscat_import/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def basic_data_shard_df():
)

test_df["margin_pixel"] = hp.ang2pix(
2**3,
2 ** 3,
test_df["weird_ra"].values,
test_df["weird_dec"].values,
lonlat=True,
Expand Down Expand Up @@ -230,7 +230,7 @@ def polar_data_shard_df():
)

test_df["margin_pixel"] = hp.ang2pix(
2**3,
2 ** 3,
test_df["weird_ra"].values,
test_df["weird_dec"].values,
lonlat=True,
Expand Down Expand Up @@ -279,7 +279,7 @@ def assert_text_file_matches(expected_lines, file_name):

@pytest.fixture
def assert_parquet_file_ids():
def assert_parquet_file_ids(file_name, id_column, expected_ids):
def assert_parquet_file_ids(file_name, id_column, expected_ids, resort_ids=True):
"""
Convenience method to read a parquet file and compare the object IDs to
a list of expected objects.
Expand All @@ -294,8 +294,9 @@ def assert_parquet_file_ids(file_name, id_column, expected_ids):
data_frame = pd.read_parquet(file_name, engine="pyarrow")
assert id_column in data_frame.columns
ids = data_frame[id_column].tolist()
ids.sort()
expected_ids.sort()
if resort_ids:
ids.sort()
expected_ids.sort()

assert len(ids) == len(
expected_ids
Expand Down

0 comments on commit c31fe68

Please sign in to comment.