Skip to content

Commit

Permalink
Merge branch 'remove-empty-partitions' into fast-im-read
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffreyangus committed Aug 3, 2022
2 parents 4d90905 + d079969 commit ced5310
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions ludwig/data/dataframe/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
import dask
import dask.array as da
import dask.dataframe as dd
import ray
import ray.data
from dask.diagnostics import ProgressBar
from ray.data.block import Block, BlockAccessor
from ray.util.client.common import ClientObjectRef
from ray.util.dask import ray_dask_get

from ludwig.data.dataframe.base import DataFrameEngine
from ludwig.utils.data_utils import split_by_slices
Expand Down Expand Up @@ -61,6 +56,8 @@ def reset_index_across_all_partitions(df):

class DaskEngine(DataFrameEngine):
def __init__(self, parallelism=None, persist=True, _use_ray=True, **kwargs):
from ray.util.dask import ray_dask_get

self._parallelism = parallelism
self._persist = persist
if _use_ray:
Expand Down Expand Up @@ -141,6 +138,8 @@ def map_partitions(self, series, map_fn, meta=None):
return series.map_partitions(map_fn, meta=meta)

def map_batches(self, series, map_fn):
import ray.data

ds = ray.data.from_dask(series)
ds = ds.map_batches(map_fn, batch_format="pandas")
return ds.to_dask()
Expand Down

0 comments on commit ced5310

Please sign in to comment.