Skip to content

Commit

Permalink
Add temp_format argument to pack_partitions_to_parquet (#22)
Browse files Browse the repository at this point in the history
* Add temp_format argument to pack_partitions_to_parquet
to control temporary directory paths

* Support DaskGeoDataFrame.cx on frame without known divisions

* Fix load_divisions=True

* Use Dask's read_parquet to compute metadata, this is more efficient than reading first partition
  • Loading branch information
jonmmease authored Jan 11, 2020
1 parent 3e2e85a commit d4e4297
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 41 deletions.
101 changes: 71 additions & 30 deletions spatialpandas/dask.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import dask.dataframe as dd
from dask import delayed
from dask.dataframe.partitionquantiles import partition_quantiles

from spatialpandas.geometry.base import _BaseCoordinateIndexer, GeometryDtype
Expand Down Expand Up @@ -184,7 +185,8 @@ def pack_partitions(self, npartitions=None, p=15, shuffle='tasks'):
return ddf

def pack_partitions_to_parquet(
self, path, filesystem=None, npartitions=None, p=15, compression="snappy"
self, path, filesystem=None, npartitions=None, p=15, compression="snappy",
tempdir_format=None
):
"""
Repartition and reorder dataframe spatially along a Hilbert space filling curve
Expand All @@ -201,7 +203,16 @@ def pack_partitions_to_parquet(
the length of the dataframe divided by 2**23.
p: Hilbert curve p parameter
compression: Compression algorithm for parquet file
tempdir_format: format string used to generate the filesystem path where
temporary files should be stored for each output partition. String
must contain a '{partition}' replacement field which will be formatted
using the output partition number as an integer. The string may
optionally contain a '{uuid}' replacement field which will be formatted
using a randomly generated UUID string. If None (the default),
temporary files are stored inside the output path.
These directories are deleted as soon as possible during the execution
of the function.
Returns:
DaskGeoDataFrame backed by newly written parquet dataset
"""
Expand All @@ -211,6 +222,19 @@ def pack_partitions_to_parquet(
# Get fsspec filesystem object
filesystem = validate_coerce_filesystem(path, filesystem)

# Compute tempdir_format string
dataset_uuid = str(uuid.uuid4())
if tempdir_format is None:
tempdir_format = os.path.join(path, "part.{partition}.parquet")
elif not isinstance(tempdir_format, str) or "{partition" not in tempdir_format:
raise ValueError(
"tempdir_format must be a string containing a {{partition}} "
"replacement field\n"
" Received: {tempdir_format}".format(
tempdir_format=repr(tempdir_format)
)
)

# Compute number of output partitions
npartitions = self._compute_packing_npartitions(npartitions)
out_partitions = list(range(npartitions))
Expand All @@ -229,48 +253,62 @@ def pack_partitions_to_parquet(
_partition=np.digitize(df.hilbert_distance, quantiles[1:], right=True))
)

# Compute part paths
parts_tmp_paths = [
tempdir_format.format(partition=out_partition, uuid=dataset_uuid)
for out_partition in out_partitions
]
part_output_paths = [
os.path.join(path, "part.%d.parquet" % out_partition)
for out_partition in out_partitions
]

# Initialize output partition directory structure
filesystem.invalidate_cache()
if filesystem.exists(path):
filesystem.rm(path, recursive=True)

for tmp_path in parts_tmp_paths:
if filesystem.exists(tmp_path):
filesystem.rm(tmp_path, recursive=True)


for out_partition in out_partitions:
part_dir = os.path.join(path, "part.%d.parquet" % out_partition)
filesystem.makedirs(part_dir, exist_ok=True)

# Shuffle and write a parquet dataset for each output partition
def process_partition(df):
uid = str(uuid.uuid4())
part_uuid = str(uuid.uuid4())
for out_partition, df_part in df.groupby('_partition'):
part_path = os.path.join(
path,
"part.%d.parquet" % out_partition,
'part.%s.parquet' % uid
tempdir_format.format(partition=out_partition, uuid=dataset_uuid),
'part.%s.parquet' % part_uuid
)
df_part = df_part.drop('_partition', axis=1).set_index(
'hilbert_distance', drop=True
)

with filesystem.open(part_path, "wb") as f:
df_part.to_parquet(f, compression=compression, index=True)
return uid
return part_uuid

ddf.map_partitions(
process_partition, meta=pd.Series([], dtype='object')
).compute()

# Concat parquet dataset per partition into parquet file per partition
def concat_parts_inplace(parts_path):
def concat_parts(parts_tmp_path, part_output_path):
filesystem.invalidate_cache()

# Load directory of parquet parts for this partition into a
# single GeoDataFrame
if not filesystem.ls(parts_path):
if not filesystem.ls(parts_tmp_path):
# Empty partition
filesystem.rm(parts_path, recursive=True)
filesystem.rm(parts_tmp_path, recursive=True)
return None
else:
part_df = read_parquet(parts_path, filesystem=filesystem)
part_df = read_parquet(parts_tmp_path, filesystem=filesystem)

# Compute total_bounds for all geometry columns in part_df
total_bounds = {}
Expand All @@ -280,7 +318,10 @@ def concat_parts_inplace(parts_path):
total_bounds[series_name] = series.total_bounds

# Delete directory of parquet parts for partition
filesystem.rm(parts_path, recursive=True)
if filesystem.exists(parts_tmp_path):
filesystem.rm(parts_tmp_path, recursive=True)
if filesystem.exists(part_output_path):
filesystem.rm(part_output_path, recursive=True)

# Sort by part_df by hilbert_distance index
part_df.sort_index(inplace=True)
Expand All @@ -289,7 +330,7 @@ def concat_parts_inplace(parts_path):
# constructing the full dataset _metadata file.
md_list = []
filesystem.invalidate_cache()
with filesystem.open(parts_path, 'wb') as f:
with filesystem.open(part_output_path, 'wb') as f:
pq.write_table(
pa.Table.from_pandas(part_df),
f, compression=compression, metadata_collector=md_list
Expand All @@ -298,20 +339,16 @@ def concat_parts_inplace(parts_path):
# Return metadata and total_bounds for part
return {"meta": md_list[0], "total_bounds": total_bounds}

part_paths = [
os.path.join(path, "part.%d.parquet" % out_partition)
for out_partition in out_partitions
]

write_info = dask.compute(*[
dask.delayed(concat_parts_inplace)(part_path) for part_path in part_paths
dask.delayed(concat_parts)(tmp_path, output_path)
for tmp_path, output_path in zip(parts_tmp_paths, part_output_paths)
])

# Handle empty partitions.
input_paths, write_info = zip(*[
(pp, wi) for (pp, wi) in zip(part_paths, write_info) if wi is not None
(pp, wi) for (pp, wi) in zip(part_output_paths, write_info) if wi is not None
])
output_paths = part_paths[:len(input_paths)]
output_paths = part_output_paths[:len(input_paths)]
for p1, p2 in zip(input_paths, output_paths):
if p1 != p2:
filesystem.move(p1, p2)
Expand Down Expand Up @@ -434,17 +471,21 @@ def _perform_get_item(self, covers_inds, overlaps_inds, x0, x1, y0, y1):
# No partitions intersect with query region, return empty result
return dd.from_pandas(self._obj._meta, npartitions=1)

result = self._obj.partitions[all_partition_inds]
@delayed
def cx_fn(df):
return df.cx[x0:x1, y0:y1]

def map_fn(df, ind):
if ind.iloc[0] in overlaps_inds:
return df.cx[x0:x1, y0:y1]
ddf = self._obj.partitions[all_partition_inds]
delayed_dfs = []
for partition_ind, delayed_df in zip(all_partition_inds, ddf.to_delayed()):
if partition_ind in overlaps_inds:
delayed_dfs.append(
cx_fn(delayed_df)
)
else:
return df
return result.map_partitions(
map_fn,
pd.Series(all_partition_inds, index=result.divisions[:-1]),
)
delayed_dfs.append(delayed_df)

return dd.from_delayed(delayed_dfs, meta=ddf._meta, divisions=ddf.divisions)


class _DaskPartitionCoordinateIndexer(_BaseCoordinateIndexer):
Expand Down
34 changes: 25 additions & 9 deletions spatialpandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import pandas as pd
from dask import delayed
from dask.dataframe import (
to_parquet as dd_to_parquet, from_delayed, from_pandas
to_parquet as dd_to_parquet, read_parquet as dd_read_parquet,
from_delayed, from_pandas,
)
from dask.dataframe.utils import make_meta, clear_known_categories

Expand Down Expand Up @@ -257,13 +258,28 @@ def _perform_read_parquet_dask(
else:
partition_bounds = {}

# Create meta DataFrame
# Make categories unknown because we can't be sure all categories are present in
# the first partition.
meta = delayed(make_meta)(delayed_partitions[0]).compute()
meta = clear_known_categories(meta)
# Use Dask's read_parquet to get metadata
if columns is not None:
cols_no_index = [col for col in columns if col != "hilbert_distance"]
else:
cols_no_index = None

meta = dd_read_parquet(
paths[0],
columns=cols_no_index,
filesystem=filesystem,
engine='pyarrow',
gather_statistics=False
)._meta

# Import geometry columns in meta, not needed for pyarrow >= 0.16
metadata = _load_parquet_pandas_metadata(paths[0], filesystem=filesystem)
geom_cols = _get_geometry_columns(metadata)
if geom_cols:
meta = _import_geometry_columns(meta, geom_cols)
meta = GeoDataFrame(meta)

# Handle geometry
# Handle geometry in meta
if geometry:
meta = meta.set_geometry(geometry)

Expand Down Expand Up @@ -377,8 +393,8 @@ def _load_divisions(pqds):
)

mins, maxes = zip(*[
(rg.column(div_col).statistics.min, rg.column(12).statistics.max)
(rg.column(div_col).statistics.min, rg.column(div_col).statistics.max)
for rg in row_groups
])

return mins, maxes
return list(mins), list(maxes)
16 changes: 14 additions & 2 deletions tests/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from hypothesis import given, settings
import hypothesis.strategies as hs
import dask.dataframe as dd
import pandas as pd
from spatialpandas import GeoSeries, GeoDataFrame
Expand Down Expand Up @@ -145,9 +146,12 @@ def test_pack_partitions(gp_multipoint, gp_multiline):
@given(
gp_multipoint=st_multipoint_array(min_size=10, max_size=40, geoseries=True),
gp_multiline=st_multiline_array(min_size=10, max_size=40, geoseries=True),
use_temp_format=hs.booleans()
)
@settings(deadline=None, max_examples=30)
def test_pack_partitions_to_parquet(gp_multipoint, gp_multiline, tmp_path):
def test_pack_partitions_to_parquet(
gp_multipoint, gp_multiline, use_temp_format, tmp_path
):
# Build dataframe
n = min(len(gp_multipoint), len(gp_multiline))
df = GeoDataFrame({
Expand All @@ -158,7 +162,15 @@ def test_pack_partitions_to_parquet(gp_multipoint, gp_multiline, tmp_path):
ddf = dd.from_pandas(df, npartitions=3)

path = tmp_path / 'ddf.parq'
ddf_packed = ddf.pack_partitions_to_parquet(path, npartitions=4)
if use_temp_format:
tempdir_format = str(tmp_path / 'scratch' / 'part-{uuid}-{partition:03d}')
else:
tempdir_format = None

ddf_packed = ddf.pack_partitions_to_parquet(
path, npartitions=4,
tempdir_format=tempdir_format
)

# Check the number of partitions (< 4 can happen in the case of empty partitions)
assert ddf_packed.npartitions <= 4
Expand Down

0 comments on commit d4e4297

Please sign in to comment.