Skip to content

Commit 199582a

Browse files
committed
Fix vy
1 parent a3b24cd commit 199582a

File tree

6 files changed

+28
-45
lines changed

6 files changed

+28
-45
lines changed

mars/dataframe/datasource/from_vineyard.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,13 @@
2020
from ...core.context import get_context
2121
from ...serialization.serializables import Int32Field, StringField
2222
from ...tensor.datasource.from_vineyard import resolve_vineyard_socket
23-
from ...utils import calc_nsplits, has_unknown_shape
23+
from ...utils import calc_nsplits, has_unknown_shape, lazy_import
2424
from ..operands import DataFrameOperand, DataFrameOperandMixin
2525
from ..utils import parse_index
2626

2727

28-
try:
29-
import vineyard
30-
from vineyard.data.utils import normalize_dtype, from_json
31-
except ImportError:
32-
vineyard = None
28+
vineyard = lazy_import("vineyard")
29+
vy_data_utils = lazy_import("vineyard.data.utils", rename="vy_data_utils")
3330

3431

3532
class DataFrameFromVineyard(DataFrameOperand, DataFrameOperandMixin):
@@ -130,15 +127,15 @@ def execute(cls, ctx, op):
130127
chunks, dtypes = [], None
131128
for idx in range(meta["partitions_-size"]):
132129
chunk_meta = meta["partitions_-%d" % idx]
133-
columns = pd.Index(from_json(chunk_meta["columns_"]))
130+
columns = pd.Index(vy_data_utils.from_json(chunk_meta["columns_"]))
134131
shape = (np.nan, len(columns))
135132
if not chunk_meta.islocal:
136133
continue
137134
if dtypes is None:
138135
dtypes = []
139136
for idx in range(len(columns)):
140137
column_meta = chunk_meta["__values_-value-%d" % idx]
141-
dtype = normalize_dtype(
138+
dtype = vy_data_utils.normalize_dtype(
142139
column_meta["value_type_"],
143140
column_meta.get("value_type_meta_", None),
144141
)

mars/dataframe/datastore/tests/test_datastore_execution.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@
3535
import fastparquet
3636
except ImportError:
3737
fastparquet = None
38-
try:
39-
import vineyard
40-
except ImportError:
41-
vineyard = None
4238

4339
from .... import dataframe as md
4440
from ....tests.core import flaky

mars/dataframe/datastore/to_vineyard.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919
from ...core import OutputType
2020
from ...serialization.serializables import FieldTypes, StringField, TupleField
2121
from ...tensor.datastore.to_vineyard import resolve_vineyard_socket
22+
from ...utils import lazy_import
2223
from ..operands import DataFrameOperand, DataFrameOperandMixin
2324
from ..utils import parse_index
2425

25-
try:
26-
import vineyard
27-
from vineyard.data.dataframe import make_global_dataframe
28-
from vineyard.data.utils import to_json
29-
except ImportError:
30-
vineyard = None
26+
vineyard = lazy_import("vineyard")
27+
vy_data_df = lazy_import("vineyard.data.dataframe", rename="vy_data_df")
28+
vy_data_utils = lazy_import("vineyard.data.utils", rename="vy_data_utils")
3129

3230

3331
class DataFrameToVineyardChunk(DataFrameOperand, DataFrameOperandMixin):
@@ -130,7 +128,7 @@ def execute(cls, ctx, op):
130128
new_meta.add_member(k, v)
131129
else:
132130
new_meta[k] = v
133-
new_meta["partition_index_"] = to_json(op.inputs[0].index)
131+
new_meta["partition_index_"] = vy_data_utils.to_json(op.inputs[0].index)
134132
df_id = client.create_metadata(new_meta).id
135133

136134
client.persist(df_id)
@@ -185,7 +183,7 @@ def execute(cls, ctx, op):
185183
# # store the result object id to execution context
186184
chunks = [ctx[chunk.key][0][0] for chunk in op.inputs]
187185
ctx[op.outputs[0].key] = pd.DataFrame(
188-
{0: [make_global_dataframe(client, chunks).id]}
186+
{0: [vy_data_df.make_global_dataframe(client, chunks).id]}
189187
)
190188

191189

mars/storage/vineyard.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,29 @@
2727
from .core import BufferWrappedFileObject, StorageFileObject
2828

2929
vineyard = lazy_import("vineyard")
30+
vy_data_pickle = lazy_import("vineyard.data.pickle", rename="vy_data_pickle")
31+
vy_data_utils = lazy_import("vineyard.data.utils", rename="vy_data_utils")
3032
pyarrow = lazy_import("pyarrow")
3133

3234
if sys.platform.startswith("win"):
33-
vineyard = None
35+
vineyard = vy_data_pickle = vy_data_utils = None
3436

3537
logger = logging.getLogger(__name__)
3638

37-
3839
# Setup support for mars datatypes on vineyard
3940

4041

4142
def mars_sparse_matrix_builder(client, value, builder, **kw):
4243
meta = vineyard.ObjectMeta()
4344
meta["typename"] = "vineyard::SparseMatrix<%s>" % value.dtype.name
44-
meta["shape_"] = vineyard.data.utils.to_json(value.shape)
45+
meta["shape_"] = vy_data_utils.to_json(value.shape)
4546
meta.add_member("spmatrix", builder.run(client, value.spmatrix, **kw))
4647
return client.create_metadata(meta)
4748

4849

4950
def mars_sparse_matrix_resolver(obj, resolver) -> sparse.SparseNDArray:
5051
meta = obj.meta
51-
shape = vineyard.data.utils.from_json(meta["shape_"])
52+
shape = vy_data_utils.from_json(meta["shape_"])
5253
spmatrix = resolver.run(obj.member("spmatrix"))
5354
return sparse.matrix.SparseMatrix(spmatrix, shape=shape)
5455

@@ -78,17 +79,13 @@ def __init__(
7879
super().__init__(object_id, mode, size=size)
7980

8081
def _read_init(self):
81-
import vineyard.data.pickle
82-
83-
self._reader = vineyard.data.pickle.PickledReader(
82+
self._reader = vy_data_pickle.PickledReader(
8483
self._client.get(self._object_id)
8584
)
8685
self._size = self._reader.store_size
8786

8887
def _write_init(self):
89-
import vineyard.data.pickle
90-
91-
self._writer = vineyard.data.pickle.PickledWriter(self._size)
88+
self._writer = vy_data_pickle.PickledWriter(self._size)
9289

9390
@property
9491
def buffer(self):

mars/tensor/datasource/from_vineyard.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,13 @@
1818
from ... import opcodes as OperandDef
1919
from ...serialization.serializables import Int32Field, StringField
2020
from ...storage.base import StorageLevel
21-
from ...utils import calc_nsplits, has_unknown_shape
21+
from ...utils import calc_nsplits, has_unknown_shape, lazy_import
2222
from ...core.context import get_context
2323
from ..operands import TensorOperand, TensorOperandMixin
2424
from .core import TensorNoInput
2525

26-
try:
27-
import vineyard
28-
from vineyard.data.utils import normalize_dtype
29-
except ImportError:
30-
vineyard = None
26+
vineyard = lazy_import("vineyard")
27+
vy_data_utils = lazy_import("vineyard.data.utils", rename="vy_data_utils")
3128

3229

3330
def resolve_vineyard_socket(ctx, op):
@@ -94,7 +91,7 @@ def execute(cls, ctx, op):
9491
chunk_meta = meta["partitions_-%d" % idx]
9592
if not chunk_meta.islocal:
9693
continue
97-
dtype = normalize_dtype(
94+
dtype = vy_data_utils.normalize_dtype(
9895
chunk_meta["value_type_"], chunk_meta.get("value_type_meta_", None)
9996
)
10097
shape = tuple(json.loads(chunk_meta["shape_"]))

mars/tensor/datastore/to_vineyard.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919
from ...core.operand.base import SchedulingHint
2020
from ...serialization.serializables import FieldTypes, KeyField, StringField, TupleField
2121
from ...storage.base import StorageLevel
22+
from ...utils import lazy_import
2223
from ..datasource import tensor as astensor
2324
from .core import TensorDataStore
2425

25-
try:
26-
import vineyard
27-
from vineyard.data.tensor import make_global_tensor
28-
from vineyard.data.utils import to_json
29-
except ImportError:
30-
vineyard = None
26+
vineyard = lazy_import("vineyard")
27+
vy_data_tensor = lazy_import("vineyard.data.tensor", rename="vy_data_tensor")
28+
vy_data_utils = lazy_import("vineyard.data.utils", rename="vy_data_utils")
3129

3230

3331
def resolve_vineyard_socket(ctx, op) -> Tuple[str, bool]:
@@ -120,7 +118,7 @@ def execute(cls, ctx, op):
120118
new_meta.add_member(k, v)
121119
else:
122120
new_meta[k] = v
123-
new_meta["partition_index_"] = to_json(op.inputs[0].index)
121+
new_meta["partition_index_"] = vy_data_utils.to_json(op.inputs[0].index)
124122
tensor_id = client.create_metadata(new_meta).id
125123

126124
client.persist(tensor_id)
@@ -168,7 +166,7 @@ def execute(cls, ctx, op):
168166
# # store the result object id to execution context
169167
chunks = [ctx[chunk.key][0] for chunk in op.inputs]
170168
holder = np.empty((1,), dtype=object)
171-
holder[0] = make_global_tensor(client, chunks).id
169+
holder[0] = vy_data_tensor.make_global_tensor(client, chunks).id
172170
ctx[op.outputs[0].key] = holder
173171

174172

0 commit comments

Comments
 (0)