diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index ca127d8..ecb3ebe 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -1,4 +1,5 @@ import dask.dataframe as dd +from dask import delayed from dask.dataframe.partitionquantiles import partition_quantiles from spatialpandas.geometry.base import _BaseCoordinateIndexer, GeometryDtype @@ -184,7 +185,8 @@ def pack_partitions(self, npartitions=None, p=15, shuffle='tasks'): return ddf def pack_partitions_to_parquet( - self, path, filesystem=None, npartitions=None, p=15, compression="snappy" + self, path, filesystem=None, npartitions=None, p=15, compression="snappy", + tempdir_format=None ): """ Repartition and reorder dataframe spatially along a Hilbert space filling curve @@ -201,7 +203,16 @@ def pack_partitions_to_parquet( the length of the dataframe divided by 2**23. p: Hilbert curve p parameter compression: Compression algorithm for parquet file - + tempdir_format: format string used to generate the filesystem path where + temporary files should be stored for each output partition. String + must contain a '{partition}' replacement field which will be formatted + using the output partition number as an integer. The string may + optionally contain a '{uuid}' replacement field which will be formatted + using a randomly generated UUID string. If None (the default), + temporary files are stored inside the output path. + + These directories are deleted as soon as possible during the execution + of the function. Returns: DaskGeoDataFrame backed by newly written parquet dataset """ @@ -211,6 +222,19 @@ def pack_partitions_to_parquet( # Get fsspec filesystem object filesystem = validate_coerce_filesystem(path, filesystem) + # Compute tempdir_format string + dataset_uuid = str(uuid.uuid4()) + if tempdir_format is None: + tempdir_format = os.path.join(path, "part.{partition}.parquet") + elif not isinstance(tempdir_format, str) or "{partition" not in tempdir_format: + raise ValueError( + "tempdir_format must be a string containing a {{partition}} " + "replacement field\n" + " Received: {tempdir_format}".format( + tempdir_format=repr(tempdir_format) + ) + ) + # Compute number of output partitions npartitions = self._compute_packing_npartitions(npartitions) out_partitions = list(range(npartitions)) @@ -229,23 +253,37 @@ def pack_partitions_to_parquet( _partition=np.digitize(df.hilbert_distance, quantiles[1:], right=True)) ) + # Compute part paths + parts_tmp_paths = [ + tempdir_format.format(partition=out_partition, uuid=dataset_uuid) + for out_partition in out_partitions + ] + part_output_paths = [ + os.path.join(path, "part.%d.parquet" % out_partition) + for out_partition in out_partitions + ] + # Initialize output partition directory structure filesystem.invalidate_cache() if filesystem.exists(path): filesystem.rm(path, recursive=True) + for tmp_path in parts_tmp_paths: + if filesystem.exists(tmp_path): + filesystem.rm(tmp_path, recursive=True) + + for out_partition in out_partitions: part_dir = os.path.join(path, "part.%d.parquet" % out_partition) filesystem.makedirs(part_dir, exist_ok=True) # Shuffle and write a parquet dataset for each output partition def process_partition(df): - uid = str(uuid.uuid4()) + part_uuid = str(uuid.uuid4()) for out_partition, df_part in df.groupby('_partition'): part_path = os.path.join( - path, - "part.%d.parquet" % out_partition, - 'part.%s.parquet' % uid + tempdir_format.format(partition=out_partition, uuid=dataset_uuid), + 'part.%s.parquet' % part_uuid ) df_part = df_part.drop('_partition', axis=1).set_index( 'hilbert_distance', drop=True @@ -253,24 +291,24 @@ def process_partition(df): with filesystem.open(part_path, "wb") as f: df_part.to_parquet(f, compression=compression, index=True) - return uid + return part_uuid ddf.map_partitions( process_partition, meta=pd.Series([], dtype='object') ).compute() # Concat parquet dataset per partition into parquet file per partition - def concat_parts_inplace(parts_path): + def concat_parts(parts_tmp_path, part_output_path): filesystem.invalidate_cache() # Load directory of parquet parts for this partition into a # single GeoDataFrame - if not filesystem.ls(parts_path): + if not filesystem.ls(parts_tmp_path): # Empty partition - filesystem.rm(parts_path, recursive=True) + filesystem.rm(parts_tmp_path, recursive=True) return None else: - part_df = read_parquet(parts_path, filesystem=filesystem) + part_df = read_parquet(parts_tmp_path, filesystem=filesystem) # Compute total_bounds for all geometry columns in part_df total_bounds = {} @@ -280,7 +318,10 @@ def concat_parts_inplace(parts_path): total_bounds[series_name] = series.total_bounds # Delete directory of parquet parts for partition - filesystem.rm(parts_path, recursive=True) + if filesystem.exists(parts_tmp_path): + filesystem.rm(parts_tmp_path, recursive=True) + if filesystem.exists(part_output_path): + filesystem.rm(part_output_path, recursive=True) # Sort by part_df by hilbert_distance index part_df.sort_index(inplace=True) @@ -289,7 +330,7 @@ def concat_parts_inplace(parts_path): # constructing the full dataset _metadata file. md_list = [] filesystem.invalidate_cache() - with filesystem.open(parts_path, 'wb') as f: + with filesystem.open(part_output_path, 'wb') as f: pq.write_table( pa.Table.from_pandas(part_df), f, compression=compression, metadata_collector=md_list @@ -298,20 +339,16 @@ def concat_parts_inplace(parts_path): # Return metadata and total_bounds for part return {"meta": md_list[0], "total_bounds": total_bounds} - part_paths = [ - os.path.join(path, "part.%d.parquet" % out_partition) - for out_partition in out_partitions - ] - write_info = dask.compute(*[ - dask.delayed(concat_parts_inplace)(part_path) for part_path in part_paths + dask.delayed(concat_parts)(tmp_path, output_path) + for tmp_path, output_path in zip(parts_tmp_paths, part_output_paths) ]) # Handle empty partitions. input_paths, write_info = zip(*[ - (pp, wi) for (pp, wi) in zip(part_paths, write_info) if wi is not None + (pp, wi) for (pp, wi) in zip(part_output_paths, write_info) if wi is not None ]) - output_paths = part_paths[:len(input_paths)] + output_paths = part_output_paths[:len(input_paths)] for p1, p2 in zip(input_paths, output_paths): if p1 != p2: filesystem.move(p1, p2) @@ -434,17 +471,21 @@ def _perform_get_item(self, covers_inds, overlaps_inds, x0, x1, y0, y1): # No partitions intersect with query region, return empty result return dd.from_pandas(self._obj._meta, npartitions=1) - result = self._obj.partitions[all_partition_inds] + @delayed + def cx_fn(df): + return df.cx[x0:x1, y0:y1] - def map_fn(df, ind): - if ind.iloc[0] in overlaps_inds: - return df.cx[x0:x1, y0:y1] + ddf = self._obj.partitions[all_partition_inds] + delayed_dfs = [] + for partition_ind, delayed_df in zip(all_partition_inds, ddf.to_delayed()): + if partition_ind in overlaps_inds: + delayed_dfs.append( + cx_fn(delayed_df) + ) else: - return df - return result.map_partitions( - map_fn, - pd.Series(all_partition_inds, index=result.divisions[:-1]), - ) + delayed_dfs.append(delayed_df) + + return dd.from_delayed(delayed_dfs, meta=ddf._meta, divisions=ddf.divisions) class _DaskPartitionCoordinateIndexer(_BaseCoordinateIndexer): diff --git a/spatialpandas/io/parquet.py b/spatialpandas/io/parquet.py index cd1a3e8..e4f25f6 100644 --- a/spatialpandas/io/parquet.py +++ b/spatialpandas/io/parquet.py @@ -6,7 +6,8 @@ import pandas as pd from dask import delayed from dask.dataframe import ( - to_parquet as dd_to_parquet, from_delayed, from_pandas + to_parquet as dd_to_parquet, read_parquet as dd_read_parquet, + from_delayed, from_pandas, ) from dask.dataframe.utils import make_meta, clear_known_categories @@ -257,13 +258,28 @@ def _perform_read_parquet_dask( else: partition_bounds = {} - # Create meta DataFrame - # Make categories unknown because we can't be sure all categories are present in - # the first partition. - meta = delayed(make_meta)(delayed_partitions[0]).compute() - meta = clear_known_categories(meta) + # Use Dask's read_parquet to get metadata + if columns is not None: + cols_no_index = [col for col in columns if col != "hilbert_distance"] + else: + cols_no_index = None + + meta = dd_read_parquet( + paths[0], + columns=cols_no_index, + filesystem=filesystem, + engine='pyarrow', + gather_statistics=False + )._meta + + # Import geometry columns in meta, not needed for pyarrow >= 0.16 + metadata = _load_parquet_pandas_metadata(paths[0], filesystem=filesystem) + geom_cols = _get_geometry_columns(metadata) + if geom_cols: + meta = _import_geometry_columns(meta, geom_cols) + meta = GeoDataFrame(meta) - # Handle geometry + # Handle geometry in meta if geometry: meta = meta.set_geometry(geometry) @@ -377,8 +393,8 @@ def _load_divisions(pqds): ) mins, maxes = zip(*[ - (rg.column(div_col).statistics.min, rg.column(12).statistics.max) + (rg.column(div_col).statistics.min, rg.column(div_col).statistics.max) for rg in row_groups ]) - return mins, maxes + return list(mins), list(maxes) diff --git a/tests/test_parquet.py b/tests/test_parquet.py index 3f017be..fa11c33 100644 --- a/tests/test_parquet.py +++ b/tests/test_parquet.py @@ -1,4 +1,5 @@ from hypothesis import given, settings +import hypothesis.strategies as hs import dask.dataframe as dd import pandas as pd from spatialpandas import GeoSeries, GeoDataFrame @@ -145,9 +146,12 @@ def test_pack_partitions(gp_multipoint, gp_multiline): @given( gp_multipoint=st_multipoint_array(min_size=10, max_size=40, geoseries=True), gp_multiline=st_multiline_array(min_size=10, max_size=40, geoseries=True), + use_temp_format=hs.booleans() ) @settings(deadline=None, max_examples=30) -def test_pack_partitions_to_parquet(gp_multipoint, gp_multiline, tmp_path): +def test_pack_partitions_to_parquet( + gp_multipoint, gp_multiline, use_temp_format, tmp_path +): # Build dataframe n = min(len(gp_multipoint), len(gp_multiline)) df = GeoDataFrame({ @@ -158,7 +162,15 @@ def test_pack_partitions_to_parquet(gp_multipoint, gp_multiline, tmp_path): ddf = dd.from_pandas(df, npartitions=3) path = tmp_path / 'ddf.parq' - ddf_packed = ddf.pack_partitions_to_parquet(path, npartitions=4) + if use_temp_format: + tempdir_format = str(tmp_path / 'scratch' / 'part-{uuid}-{partition:03d}') + else: + tempdir_format = None + + ddf_packed = ddf.pack_partitions_to_parquet( + path, npartitions=4, + tempdir_format=tempdir_format + ) # Check the number of partitions (< 4 can happen in the case of empty partitions) assert ddf_packed.npartitions <= 4