Skip to content

[WIP][Scheduling] Add worker node failover with lineage #3307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mars/deploy/oscar/base_config.yml
Original file line number Diff line number Diff line change
@@ -57,6 +57,8 @@ scheduling:
# Max number of concurrent speculative run for a subtask.
max_concurrent_run: 3
subtask_cancel_timeout: 5
failover:
enable_lineage: no
metrics:
backend: console
# If backend is prometheus, then we can add prometheus config as follows:
5 changes: 4 additions & 1 deletion mars/deploy/oscar/ray.py
Original file line number Diff line number Diff line change
@@ -632,7 +632,10 @@ async def stop(self):
for worker_address in self._worker_addresses:
await stop_worker(worker_address, self._config)
for pool in self._worker_pools:
await pool.actor_pool.remote("stop")
try:
await pool.actor_pool.remote("stop")
except:
pass
if self._supervisor_pool is not None:
await stop_supervisor(self.supervisor_address, self._config)
await self._supervisor_pool.actor_pool.remote("stop")
8 changes: 8 additions & 0 deletions mars/deploy/oscar/tests/fault_injection_config_with_fo.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"@inherits": '@default'
third_party_modules:
- mars.services.tests.fault_injection_patch
scheduling:
subtask_max_retries: 2
subtask_max_reschedules: 2
failover:
enable_lineage: yes
28 changes: 1 addition & 27 deletions mars/deploy/oscar/tests/test_fault_injection.py
Original file line number Diff line number Diff line change
@@ -23,12 +23,12 @@
from ....oscar.errors import ServerClosed
from ....remote import spawn
from ....services.tests.fault_injection_manager import (
AbstractFaultInjectionManager,
ExtraConfigKey,
FaultInjectionError,
FaultInjectionUnhandledError,
FaultPosition,
FaultType,
create_fault_injection_manager,
)
from ....tensor.base.psrs import PSRSConcatPivot
from ..local import new_cluster
@@ -54,32 +54,6 @@ async def fault_cluster(request):
yield client


async def create_fault_injection_manager(
session_id, address, fault_count, fault_type, fault_op_types=None
):
class FaultInjectionManager(AbstractFaultInjectionManager):
def __init__(self):
self._fault_count = fault_count

def set_fault_count(self, count):
self._fault_count = count

def get_fault_count(self):
return self._fault_count

def get_fault(self, pos: FaultPosition, ctx=None) -> FaultType:
# Check op types if fault_op_types provided.
if fault_op_types and type(ctx.get("operand")) not in fault_op_types:
return FaultType.NoFault
if self._fault_count.get(pos, 0) > 0:
self._fault_count[pos] -= 1
return fault_type
return FaultType.NoFault

await FaultInjectionManager.create(session_id, address)
return FaultInjectionManager.name


@pytest.mark.parametrize(
"fault_and_exception",
[
114 changes: 114 additions & 0 deletions mars/deploy/oscar/tests/test_ray_error_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import numpy as np
import pandas as pd
import pytest

from ....oscar.errors import ServerClosed
from ....remote import spawn
from ....services.tests.fault_injection_manager import (
FaultType,
FaultPosition,
create_fault_injection_manager,
ExtraConfigKey,
)
from ....tests.core import require_ray
from ....utils import lazy_import

from .... import tensor as mt
from .... import dataframe as md
from ..ray import new_cluster

ray = lazy_import("ray")

CONFIG_FILE = os.path.join(os.path.dirname(__file__), "local_test_with_ray_config.yml")
FAULT_INJECTION_CONFIG_FILE = os.path.join(
os.path.dirname(__file__), "fault_injection_config_with_fo.yml"
)


@pytest.fixture
async def fault_ray_cluster(request):
param = getattr(request, "param", {})
client = await new_cluster(
config=param.get("config", CONFIG_FILE),
worker_num=2,
worker_cpu=2,
)
async with client:
yield client


@pytest.mark.parametrize(
"fault_ray_cluster", [{"config": FAULT_INJECTION_CONFIG_FILE}], indirect=True
)
@pytest.mark.parametrize(
"fault_config",
[
[
FaultType.ProcessExit,
{FaultPosition.ON_RUN_SUBTASK: 1},
pytest.raises(ServerClosed),
["_UnretryableException", "*"],
],
],
)
@require_ray
@pytest.mark.asyncio
async def test_node_failover(fault_ray_cluster, fault_config):
fault_type, fault_count, expect_raises, exception_match = fault_config
name = await create_fault_injection_manager(
session_id=fault_ray_cluster.session.session_id,
address=fault_ray_cluster.session.address,
fault_count=fault_count,
fault_type=fault_type,
)

columns = list("ABCDEFGHIJ")
width = len(columns)

df1 = md.DataFrame(
mt.random.randint(
1,
100,
size=(100, width),
chunk_size=(20, width),
),
columns=columns,
)
df2 = df1.execute()
pd1 = df2.to_pandas()

df3 = df2.rechunk(chunk_size=(10, width))
df4 = df3.execute()

def f(x):
return x + 1

r = spawn(f, args=(1,), retry_when_fail=False)
with expect_raises:
r.execute(extra_config={ExtraConfigKey.FAULT_INJECTION_MANAGER_NAME: name})

df5 = df4.apply(
f,
axis=1,
dtypes=pd.Series([np.dtype(np.int64)] * width, index=columns),
output_type="dataframe",
)
df6 = df5.execute()
pd2 = df6.to_pandas()

pd.testing.assert_frame_equal(pd1 + 1, pd2)
13 changes: 10 additions & 3 deletions mars/oscar/backends/core.py
Original file line number Diff line number Diff line change
@@ -66,7 +66,8 @@ async def _listen(self, client: Client):
# close failed, ignore it
pass
raise ServerClosed(
f"Remote server {client.dest_address} closed"
f"Remote server {client.dest_address} closed",
address=client.dest_address,
) from None
future = self._client_to_message_futures[client].pop(message.message_id)
future.set_result(message)
@@ -94,7 +95,10 @@ async def _listen(self, client: Client):

message_futures = self._client_to_message_futures.get(client)
self._client_to_message_futures[client] = dict()
error = ServerClosed(f"Remote server {client.dest_address} closed")
error = ServerClosed(
f"Remote server {client.dest_address} closed",
address=client.dest_address,
)
for future in message_futures.values():
future.set_exception(copy.copy(error))

@@ -119,7 +123,10 @@ async def call(
except ConnectionError:
# close failed, ignore it
pass
raise ServerClosed(f"Remote server {client.dest_address} closed")
raise ServerClosed(
f"Remote server {client.dest_address} closed",
address=client.dest_address,
)

if not wait:
r = wait_response
3 changes: 2 additions & 1 deletion mars/oscar/backends/ray/communication.py
Original file line number Diff line number Diff line change
@@ -505,7 +505,8 @@ async def __on_ray_recv__(self, channel_id: ChannelID, message):
if self.stopped:
raise ServerClosed(
f"Remote server {self.address} closed, but got message {message} "
f"from channel {channel_id}"
f"from channel {channel_id}",
address=self.address,
)
channel = self._channels.get(channel_id)
if not channel:
5 changes: 4 additions & 1 deletion mars/oscar/backends/ray/pool.py
Original file line number Diff line number Diff line change
@@ -246,7 +246,10 @@ async def __on_ray_recv__(self, channel_id: ChannelID, message):
"""Method for communication based on ray actors"""
try:
if self._ray_server is None:
raise ServerClosed(f"Remote server {channel_id.dest_address} closed")
raise ServerClosed(
f"Remote server {channel_id.dest_address} closed",
address=channel_id.dest_address,
)
return await self._ray_server.__on_ray_recv__(channel_id, message)
except Exception: # pragma: no cover
return RayChannelException(*sys.exc_info())
14 changes: 14 additions & 0 deletions mars/oscar/errors.py
Original file line number Diff line number Diff line change
@@ -46,6 +46,20 @@ class SlotStateError(MarsError):


class ServerClosed(MarsError):
def __init__(self, *args, **kwargs):
self._address = kwargs.pop("address", None)
super().__init__(*args, **kwargs)

@property
def address(self):
return self._address


class DataNotExist(MarsError):
pass


class DuplicatedSubtaskError(MarsError):
pass


16 changes: 11 additions & 5 deletions mars/services/cluster/supervisor/node_info.py
Original file line number Diff line number Diff line change
@@ -140,20 +140,26 @@ def get_all_bands(
statuses = statuses or {NodeStatus.READY}
role = role or NodeRole.WORKER
nodes = self._role_to_nodes.get(role, [])
return self.get_bands(nodes, statuses)

def get_bands(
self, addresses: List[str], statuses: Set[NodeStatus] = None
) -> Dict[BandType, Resource]:
statuses = statuses or {NodeStatus.READY}
band_resource = dict()
for node in nodes:
if self._node_infos[node].status not in statuses:
for address in addresses:
if self._node_infos[address].status not in statuses:
continue
node_resource = self._node_infos[node].resource
node_resource = self._node_infos[address].resource
for resource_type, info in node_resource.items():
if resource_type.startswith("numa"):
# cpu
band_resource[(node, resource_type)] = Resource(
band_resource[(address, resource_type)] = Resource(
num_cpus=info["cpu_total"], mem_bytes=info["memory_total"]
)
else: # pragma: no cover
assert resource_type.startswith("gpu")
band_resource[(node, resource_type)] = Resource(
band_resource[(address, resource_type)] = Resource(
num_gpus=info["gpu_total"]
)
return band_resource
18 changes: 18 additions & 0 deletions mars/services/context.py
Original file line number Diff line number Diff line change
@@ -295,3 +295,21 @@ def wrap(*args, **kwargs):
return fut.result()

return wrap


class _FailOverContext:
def __init__(self):
self._enable_lineage = False
self.subtask_to_dependency_subtasks = defaultdict(set)

def enable_lineage(self):
self._enable_lineage = True

def is_lineage_enabled(self):
return self._enable_lineage

def cleanup(self):
self.subtask_to_dependency_subtasks.clear()


FailOverContext = _FailOverContext()
3 changes: 2 additions & 1 deletion mars/services/lifecycle/supervisor/tests/test_tracker.py
Original file line number Diff line number Diff line change
@@ -18,10 +18,11 @@
from ..... import oscar as mo
from ..... import tensor as mt
from .....core import tile
from .....oscar.errors import DataNotExist
from ....cluster import MockClusterAPI
from ....meta import MockMetaAPI
from ....session import MockSessionAPI
from ....storage import MockStorageAPI, DataNotExist
from ....storage import MockStorageAPI
from ....task.supervisor.manager import TaskManagerActor
from ... import TileableNotTracked
from ...supervisor.tracker import LifecycleTrackerActor
13 changes: 11 additions & 2 deletions mars/services/scheduling/api/oscar.py
Original file line number Diff line number Diff line change
@@ -71,7 +71,10 @@ async def get_subtask_schedule_summaries(
return await self._manager_ref.get_schedule_summaries(task_id)

async def add_subtasks(
self, subtasks: List[Subtask], priorities: Optional[List[Tuple]] = None
self,
subtasks: List[Subtask],
priorities: Optional[List[Tuple]] = None,
schedule_lost_objects: bool = False,
):
"""
Submit subtasks into scheduling service
@@ -85,7 +88,9 @@ async def add_subtasks(
"""
if priorities is None:
priorities = [subtask.priority or tuple() for subtask in subtasks]
await self._manager_ref.add_subtasks(subtasks, priorities)
await self._manager_ref.add_subtasks(
subtasks, priorities, schedule_lost_objects
)

@mo.extensible
async def update_subtask_priority(self, subtask_id: str, priority: Tuple):
@@ -155,6 +160,10 @@ async def try_enable_autoscale_in(self):
`disable_autoscale_in` has been invoked."""
await self._autoscaler.try_enable_autoscale_in()

@property
def address(self):
return self._address


class MockSchedulingAPI(SchedulingAPI):
@classmethod
2 changes: 2 additions & 0 deletions mars/services/scheduling/supervisor/assigner.py
Original file line number Diff line number Diff line change
@@ -216,6 +216,8 @@ async def assign_subtasks(
async def reassign_subtasks(
self, band_to_queued_num: Dict[BandType, int]
) -> Dict[BandType, int]:
# Note: bands may change
self._update_bands(list(await self._cluster_api.get_all_bands(NodeRole.WORKER)))
move_queued_subtasks = {}
for is_gpu in (False, True):
band_name_prefix = "numa" if not is_gpu else "gpu"
74 changes: 65 additions & 9 deletions mars/services/scheduling/supervisor/manager.py
Original file line number Diff line number Diff line change
@@ -19,11 +19,14 @@
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Union

from ... import NodeRole
from ...cluster.core import NodeStatus
from ...cluster.supervisor.node_info import NodeInfoCollectorActor
from .... import oscar as mo
from ....lib.aio import alru_cache
from ....metrics import Metrics
from ....oscar.backends.context import ProfilingContext
from ....oscar.errors import MarsError
from ....oscar.errors import MarsError, ServerClosed, DuplicatedSubtaskError
from ....oscar.profiling import ProfilingData, MARS_ENABLE_PROFILING
from ....typing import BandType
from ....utils import dataslots, Timer
@@ -123,14 +126,24 @@ async def __post_create__(self):
)
await self._speculation_execution_scheduler.start()

self._node_info_ref = await mo.actor_ref(
NodeInfoCollectorActor.default_uid(),
address=self.address,
)

async def __pre_destroy__(self):
await self._speculation_execution_scheduler.stop()

@alru_cache
async def _get_task_api(self):
return await TaskAPI.create(self._session_id, self.address)

async def add_subtasks(self, subtasks: List[Subtask], priorities: List[Tuple]):
async def add_subtasks(
self,
subtasks: List[Subtask],
priorities: List[Tuple],
schedule_lost_objects: bool = False,
):
async with redirect_subtask_errors(self, subtasks):
for subtask in subtasks:
# the extra_config may be None. the extra config overwrites the default value.
@@ -142,7 +155,9 @@ async def add_subtasks(self, subtasks: List[Subtask], priorities: List[Tuple]):
if subtask_max_reschedules is None:
subtask_max_reschedules = self._subtask_max_reschedules
if subtask.subtask_id in self._subtask_infos: # pragma: no cover
raise KeyError(f"Subtask {subtask.subtask_id} already added.")
raise DuplicatedSubtaskError(
f"Subtask {subtask.subtask_id} already added."
)
self._subtask_infos[subtask.subtask_id] = SubtaskScheduleInfo(
subtask, max_reschedules=subtask_max_reschedules
)
@@ -161,7 +176,9 @@ async def add_subtasks(self, subtasks: List[Subtask], priorities: List[Tuple]):
)
)
await self._queueing_ref.add_subtasks(
[subtask for subtask in subtasks if not subtask.virtual], priorities
[subtask for subtask in subtasks if not subtask.virtual],
priorities,
schedule_lost_objects=schedule_lost_objects,
)
await self._queueing_ref.submit_subtasks.tell()

@@ -177,7 +194,7 @@ async def finish_subtasks(
bands: List[BandType] = None,
schedule_next: bool = True,
):
logger.debug("Finished subtasks %s.", subtask_ids)
logger.debug("Finished subtasks %s in bands %s.", subtask_ids, bands)
band_tasks = defaultdict(lambda: 0)
bands = bands or [None] * len(subtask_ids)
for subtask_id, subtask_band in zip(subtask_ids, bands):
@@ -202,7 +219,7 @@ async def finish_subtasks(
await aio_task
if schedule_next:
band_tasks[subtask_band] += 1
if subtask_info.band_futures:
if subtask_band is not None and subtask_info.band_futures:
# Cancel subtask here won't change subtask status.
# See more in `TaskProcessorActor.set_subtask_result`
logger.info(
@@ -268,7 +285,12 @@ async def submit_subtask_to_band(self, subtask_id: str, band: BandType):
"stage_id": subtask_info.subtask.stage_id,
},
)
logger.debug("Start run subtask %s in band %s.", subtask_id, band)
logger.debug(
"Start run subtask %s of stage %s in band %s.",
subtask_id,
subtask_info.subtask.stage_id,
band,
)
with Timer() as timer:
task = asyncio.create_task(
execution_ref.run_subtask.options(
@@ -283,10 +305,44 @@ async def submit_subtask_to_band(self, subtask_id: str, band: BandType):
subtask_info.subtask, band, timer.duration
)
task_api = await self._get_task_api()
logger.debug("Finished subtask %s with result %s.", subtask_id, result)
logger.debug(
"Finished subtask %s of stage %s with result %s in band %s.",
subtask_id,
subtask_info.subtask.stage_id,
result.status,
band,
)
await task_api.set_subtask_result(result)
except (OSError, MarsError) as ex:
# TODO: We should handle ServerClosed Error.
if isinstance(ex, ServerClosed):
# 1. reassign subtasks of closed node
band_resource = await self._node_info_ref.get_bands([ex.address])
logger.debug("Got bands %s of node %s.", band_resource, ex.address)
for band in band_resource.keys():
reassigned_queue = await self._queueing_ref.get_band_queue(band)
reassigned_subtasks = []
priorities = []
for item in reassigned_queue:
reassigned_subtasks.append(item.subtask)
priorities.append(item.subtask.priority)
await self._queueing_ref.add_subtasks(
reassigned_subtasks,
priorities,
exclude_bands=set([band]),
)
await self._queueing_ref.remove_queue_from_band(band)
logger.info(
"Reassigned %d subtasks of band %s to other nodes.",
len(reassigned_subtasks),
band,
)
# 2. mark closed node stopped
await self._node_info_ref.update_node_info(
address=ex.address,
role=NodeRole.WORKER,
status=NodeStatus.STOPPED,
)
logger.info("Node %s marked as stopped.", ex.address)
if (
subtask_info.subtask.retryable
and subtask_info.num_reschedules < subtask_info.max_reschedules
52 changes: 49 additions & 3 deletions mars/services/scheduling/supervisor/queueing.py
Original file line number Diff line number Diff line change
@@ -44,9 +44,20 @@ def __lt__(self, other: "HeapItem"):
return self.priority > other.priority


@dataslots
@dataclass
class LostHeapItem:
subtask: Subtask
priority: Tuple

def __lt__(self, other: "LostHeapItem"):
return self.priority < other.priority


class SubtaskQueueingActor(mo.Actor):
_stid_to_bands: DefaultDict[str, List[Tuple]]
_stid_to_items: Dict[str, HeapItem]
_lost_subtask_items: List[LostHeapItem]
_band_queues: DefaultDict[Tuple, List[HeapItem]]

@classmethod
@@ -57,6 +68,7 @@ def __init__(self, session_id: str, submit_period: Union[float, int] = None):
self._session_id = session_id
self._stid_to_bands = defaultdict(list)
self._stid_to_items = dict()
self._lost_subtask_queue = list()
# Note that we need to ensure top item in every band heap queue is valid,
# so that we can ensure band queue is busy if the band queue is not empty.
self._band_queues = defaultdict(list)
@@ -167,7 +179,27 @@ async def add_subtasks(
priorities: List[Tuple],
exclude_bands: Set[Tuple] = None,
random_when_unavailable: bool = True,
schedule_lost_objects: bool = False,
):
if schedule_lost_objects:
for subtask, priority in zip(subtasks, priorities):
if subtask not in self._lost_subtask_queue:
heap_item = self._stid_to_items[subtask.subtask_id] = LostHeapItem(
subtask, priority
)
heapq.heappush(self._lost_subtask_queue, heap_item)
logger.info(
"Subtask %s with priority %s enqueued to lost subtask queue.",
subtask.subtask_id,
priority,
)
else:
logger.info(
"Subtask %s has been in lost subtask queue, so do not enqueue it again.",
subtask.subtask_id,
)
return

bands = await self._assigner_ref.assign_subtasks(
subtasks, exclude_bands, random_when_unavailable
)
@@ -206,7 +238,7 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None)
self._band_to_resource[band].num_cpus
or self._band_to_resource[band].num_gpus
)
task_queue = self._band_queues[band]
task_queue = self._lost_subtask_queue or self._band_queues.get(band, list())
submit_items = dict()
while (
self._ensure_top_item_valid(task_queue)
@@ -251,8 +283,11 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None)
submitted_bands, submit_items_list, submitted_ids_list
):
subtask_ids = list(submit_items)
task_queue = self._band_queues[band]

task_queue = (
self._lost_subtask_queue
if isinstance(list(submit_items.values())[0], LostHeapItem)
else self._band_queues[band]
)
async with redirect_subtask_errors(
self, [item.subtask for item in submit_items.values()]
):
@@ -323,6 +358,12 @@ async def all_bands_busy(self) -> bool:
return all(len(self._band_queues[band]) > 0 for band in bands)
return False

def get_band_queue(self, band: str):
return self._band_queues.get(band, list())

def remove_queue_from_band(self, band: str):
self._band_queues.pop(band, None)

async def balance_queued_subtasks(self):
# record length of band queues
band_num_queued_subtasks = {
@@ -342,6 +383,11 @@ async def balance_queued_subtasks(self):
item = heapq.heappop(task_queue)
self._stid_to_bands[item.subtask.subtask_id].remove(band)
items.append(item)
logger.debug(
"Removed subtask %s from band %s.",
item.subtask.subtask_id,
band,
)
elif move > 0:
item = items.pop()
self._stid_to_bands[item.subtask.subtask_id].append(band)
6 changes: 6 additions & 0 deletions mars/services/scheduling/supervisor/service.py
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
import asyncio

from .... import oscar as mo
from ...context import FailOverContext
from ...core import AbstractService
from .autoscale import AutoscalerActor
from .manager import DEFAULT_SUBTASK_MAX_RESCHEDULES
@@ -126,6 +127,11 @@ async def create_session(self, session_id: str):
)
await autoscaler_ref.register_session(session_id, self._address)

# Initialize failover context
failover_config = scheduling_config.get("failover", {})
if failover_config.get("enable_lineage"):
FailOverContext.enable_lineage()

async def destroy_session(self, session_id: str):
from .queueing import SubtaskQueueingActor
from .manager import SubtaskManagerActor
Original file line number Diff line number Diff line change
@@ -53,6 +53,7 @@ def add_subtasks(
priorities: List[Tuple],
exclude_bands: Set[Tuple] = None,
random_when_unavailable: bool = True,
schedule_lost_objects: bool = False,
):
if self._error is not None:
raise self._error
4 changes: 2 additions & 2 deletions mars/services/scheduling/worker/execution.py
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@
from ....core.operand import Fetch, FetchShuffle
from ....lib.aio import alru_cache
from ....metrics import Metrics
from ....oscar.errors import MarsError
from ....oscar.errors import MarsError, DuplicatedSubtaskError
from ....storage import StorageLevel
from ....utils import dataslots, get_chunk_key_to_data_keys, wrap_exception
from ...cluster import ClusterAPI
@@ -519,7 +519,7 @@ async def run_subtask(
self, subtask: Subtask, band_name: str, supervisor_address: str
):
if subtask.subtask_id in self._subtask_info: # pragma: no cover
raise Exception(
raise DuplicatedSubtaskError(
f"Subtask {subtask.subtask_id} is already running on this band[{self.address}]."
)
logger.debug(
1 change: 0 additions & 1 deletion mars/services/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -14,4 +14,3 @@

from .api import StorageAPI, MockStorageAPI, WebStorageAPI
from .core import DataInfo
from .errors import DataNotExist
3 changes: 2 additions & 1 deletion mars/services/storage/core.py
Original file line number Diff line number Diff line change
@@ -21,12 +21,13 @@
from ... import oscar as mo
from ...lib.aio import AioFileObject
from ...oscar.backends.allocate_strategy import IdleLabel, NoIdleSlot
from ...oscar.errors import DataNotExist
from ...resource import cuda_card_stats
from ...storage import StorageLevel, get_storage_backend
from ...storage.base import ObjectInfo, StorageBackend
from ...storage.core import StorageFileObject
from ...utils import dataslots
from .errors import DataNotExist, StorageFull
from .errors import StorageFull

logger = logging.getLogger(__name__)

4 changes: 0 additions & 4 deletions mars/services/storage/errors.py
Original file line number Diff line number Diff line change
@@ -13,10 +13,6 @@
# limitations under the License.

from ...core.base import MarsError
from ...storage.errors import DataNotExist


DataNotExist = DataNotExist


class NoDataToSpill(MarsError):
3 changes: 2 additions & 1 deletion mars/services/storage/handler.py
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
from typing import Any, Dict, List, Union

from ... import oscar as mo
from ...oscar.errors import DataNotExist
from ...storage import StorageLevel, get_storage_backend
from ...storage.core import StorageFileObject
from ...typing import BandType
@@ -31,7 +32,7 @@
build_data_info,
WrappedStorageFileObject,
)
from .errors import DataNotExist, NoDataToSpill
from .errors import NoDataToSpill

cupy = lazy_import("cupy")
cudf = lazy_import("cudf")
2 changes: 1 addition & 1 deletion mars/services/storage/tests/test_transfer.py
Original file line number Diff line number Diff line change
@@ -22,9 +22,9 @@

from .... import oscar as mo
from ....oscar.backends.allocate_strategy import IdleLabel
from ....oscar.errors import DataNotExist
from ....storage import StorageLevel
from ..core import DataManagerActor, StorageManagerActor, StorageQuotaActor
from ..errors import DataNotExist
from ..handler import StorageHandlerActor
from ..transfer import ReceiverManagerActor, SenderManagerActor

4 changes: 4 additions & 0 deletions mars/services/task/core.py
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ class TaskStatus(Enum):
pending = 0
running = 1
terminated = 2
errored = 3


class Task(Serializable):
@@ -41,6 +42,7 @@ class Task(Serializable):
tileable_graph: TileableGraph = ReferenceField("tileable_graph", TileableGraph)
fuse_enabled: bool = BoolField("fuse_enabled")
extra_config: dict = DictField("extra_config")
counter: object = AnyField("counter")

def __init__(
self,
@@ -49,13 +51,15 @@ def __init__(
tileable_graph: TileableGraph = None,
fuse_enabled: bool = True,
extra_config: dict = None,
counter: object = None,
):
super().__init__(
task_id=task_id,
session_id=session_id,
tileable_graph=tileable_graph,
fuse_enabled=fuse_enabled,
extra_config=extra_config,
counter=counter,
)


4 changes: 4 additions & 0 deletions mars/services/task/execution/api.py
Original file line number Diff line number Diff line change
@@ -215,6 +215,10 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
def get_stage_processors(self):
"""Get stage processors."""

@abstractmethod
def get_stage_generation_order(self, stage_id: str):
"""Get stage generation order."""


_name_to_task_executor_cls: Dict[str, Type[TaskExecutor]] = {}

65 changes: 32 additions & 33 deletions mars/services/task/execution/mars/executor.py
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
import asyncio
import logging
import sys
import weakref
from collections import defaultdict
from typing import Dict, List, Optional, Set

@@ -94,6 +95,7 @@ def __init__(

self._stage_processors = []
self._stage_tile_progresses = []
self._stage_id_to_processor = weakref.WeakValueDictionary()
self._cur_stage_processor = None
self._result_tileables_lifecycle = None
self._subtask_decref_events = dict()
@@ -192,6 +194,7 @@ async def execute_subtask_graph(
await self._incref_stage(stage_processor)
await self._resource_evaluator.evaluate(stage_processor)
self._stage_processors.append(stage_processor)
self._stage_id_to_processor[stage_id] = stage_processor
self._cur_stage_processor = stage_processor
# get the tiled progress for current stage
prev_progress = sum(self._stage_tile_progresses)
@@ -250,50 +253,43 @@ async def cancel(self):
await self._cur_stage_processor.cancel()

async def set_subtask_result(self, subtask_result: SubtaskResult):
if self._cur_stage_processor is None or (
subtask_result.stage_id
and self._cur_stage_processor.stage_id != subtask_result.stage_id
):
logger.warning(
"Stage %s for subtask %s not exists, got stale subtask result %s which may be "
"speculative execution from previous stages, just ignore it.",
subtask_result.stage_id,
subtask_result.subtask_id,
subtask_result,
)
return
stage_processor = self._cur_stage_processor
stage_processor = (
self._cur_stage_processor
or self._stage_id_to_processor[subtask_result.stage_id]
)
subtask = stage_processor.subtask_id_to_subtask[subtask_result.subtask_id]

prev_result = stage_processor.subtask_results.get(subtask)
if prev_result and (
prev_result.status == SubtaskStatus.succeeded
or prev_result.progress > subtask_result.progress
):
logger.info(
"Skip set subtask %s with result %s, previous result is %s.",
subtask.subtask_id,
subtask_result,
prev_result,
if prev_result and prev_result.bands and prev_result.bands[0] is not None:
logger.debug(
"Previous result of subtask %s is %s.", subtask.subtask_id, prev_result
)
# For duplicate run of subtasks, if the progress is smaller or the subtask has finished or canceled
# in task speculation, just do nothing.
# TODO(chaokunyang) If duplicate run of subtasks failed, it may be the fault in worker node,
# print the exception, and if multiple failures on the same node, remove the node from the cluster.
return
if (
subtask_result.status == SubtaskStatus.cancelled
or prev_result.progress > subtask_result.progress
):
logger.info(
"Skip set subtask %s with result %s, previous result is %s.",
subtask.subtask_id,
subtask_result,
prev_result,
)
# For duplicate run of subtasks, if the progress is smaller or the subtask has finished or canceled
# in task speculation, just do nothing.
# TODO(chaokunyang) If duplicate run of subtasks failed, it may be the fault in worker node,
# print the exception, and if multiple failures on the same node, remove the node from the cluster.
return

if subtask_result.bands:
[band] = subtask_result.bands
else:
band = None
stage_processor.subtask_snapshots[subtask] = subtask_result.update(
stage_processor.subtask_snapshots.get(subtask)
)

stage_processor.subtask_snapshots[subtask] = subtask_result
if subtask_result.status.is_done:
# update stage_processor.subtask_results to avoid concurrent set_subtask_result
# since we release lock when `_decref_input_subtasks`.
stage_processor.subtask_results[subtask] = subtask_result.update(
stage_processor.subtask_results.get(subtask)
)
stage_processor.subtask_results[subtask] = subtask_result
try:
# Since every worker will call supervisor to set subtask result,
# we need to release actor lock to make `decref_chunks` parallel to avoid blocking
@@ -323,6 +319,9 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
def get_stage_processors(self):
return self._stage_processors

def get_stage_generation_order(self, stage_id):
return self._stage_id_to_processor[stage_id].generation_order

async def _incref_fetch_tileables(self):
# incref fetch tileables in tileable graph to prevent them from deleting
to_incref_tileable_keys = [
378 changes: 315 additions & 63 deletions mars/services/task/execution/mars/stage.py

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions mars/services/task/execution/ray/executor.py
Original file line number Diff line number Diff line change
@@ -1074,3 +1074,8 @@ def gc():
last_check_slow_time = curr_time
# Fast to next loop and give it a chance to update object_ref_to_subtask.
await asyncio.sleep(interval_seconds if len(ready_objects) == 0 else 0)

def get_stage_generation_order(self, stage_id: str):
raise NotImplementedError(
"RayTaskExecutor doesn't support stage generation order."
)
45 changes: 45 additions & 0 deletions mars/services/task/supervisor/manager.py
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
import asyncio
import contextlib
import importlib
import itertools
import logging
import time
import weakref
@@ -87,6 +88,7 @@ def __init__(self, session_id: str):

self._task_id_to_processor_ref = dict()
self._result_tileable_key_to_info = defaultdict(list)
self._counter = itertools.count()

async def __post_create__(self):
# get config
@@ -154,6 +156,7 @@ async def submit_tileable_graph(
graph,
fuse_enabled=fuse_enabled,
extra_config=extra_config,
counter=self._counter,
)
# gen task processor
tiled_context = await self._gen_tiled_context(graph)
@@ -377,3 +380,45 @@ async def remove_tileables(self, tileable_keys: List[str]):
if not is_done
]
self._result_tileable_key_to_info[key] = not_done_info

def get_generation_order(self, task_id, stage_id):
"""
Gets generation order of a stage subtask graph in a task.
Parameters
----------
task_id: task id
stage_id: stage id
Returns
-------
out: an integer
Exceptions
-------
May throw KeyError
"""
return self._task_id_to_processor_ref[task_id].get_generation_order(
task_id, stage_id
)

async def get_subtask(self, chunk_data_key: str):
"""
Gets subtask by task id and chunk data key, i.e. the subtask who
generated the chunk data.
Parameters
----------
task_id: task id
chunk_data_key: key of a chunk data
Returns
-------
a subtask
"""
for task_id, processor_ref in self._task_id_to_processor_ref.items():
subtask = await processor_ref.get_subtask(task_id, chunk_data_key)
if subtask is not None:
return subtask

raise KeyError(f"Chunk data key {chunk_data_key} does not exist.")
21 changes: 20 additions & 1 deletion mars/services/task/supervisor/processor.py
Original file line number Diff line number Diff line change
@@ -29,7 +29,8 @@
)
from ....typing import TileableType, ChunkType
from ....utils import Timer
from ...subtask import SubtaskResult, Subtask
from ...context import FailOverContext
from ...subtask import SubtaskResult, SubtaskGraph, Subtask
from ..core import Task, TaskResult, TaskStatus, new_task_id
from ..execution.api import TaskExecutor, ExecutionChunkResult
from .preprocessor import TaskPreprocessor
@@ -58,6 +59,10 @@ def __init__(
self._tileable_id_to_tileable = dict()
self._chunk_to_subtasks = dict()
self._stage_tileables = set()
# Note: Record subtask lineage info for fault recovery, key is output
# chunk data of a `SubtaskGraph`, value is subtask that generates the
# chunk data.
self._chunk_data_key_to_subtask = dict()

if MARS_ENABLE_PROFILING:
ProfilingData.init(task.task_id)
@@ -241,6 +246,8 @@ async def _process_stage_chunk_graph(
},
)

self._record_subtask_lineage_info(subtask_graph)

tile_context = await asyncio.to_thread(
self._get_stage_tile_context,
{c for c in chunk_graph.result_chunks if not isinstance(c.op, Fetch)},
@@ -261,6 +268,12 @@ async def _process_stage_chunk_graph(
optimization_records = None
self._update_stage_meta(chunk_to_result, tile_context, optimization_records)

def _record_subtask_lineage_info(self, subtask_graph: SubtaskGraph):
if FailOverContext.is_lineage_enabled():
for subtask in subtask_graph.iter_indep(reverse=True):
for chunk_data in subtask.chunk_graph.result_chunks:
self._chunk_data_key_to_subtask[chunk_data.key] = subtask

def _get_stage_tile_context(self, result_chunks: Set[Chunk]) -> TileContext:
collected = self._stage_tileables
tile_context = TileContext()
@@ -468,3 +481,9 @@ def _finish(self):

def is_done(self) -> bool:
return self.done.is_set()

def get_generation_order(self, stage_id: str):
return self._executor.get_stage_generation_order(stage_id)

def get_subtask(self, chunk_data_key: str):
return self._chunk_data_key_to_subtask.get(chunk_data_key)
15 changes: 15 additions & 0 deletions mars/services/task/supervisor/task.py
Original file line number Diff line number Diff line change
@@ -416,9 +416,24 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
)
if self._cur_processor is not None:
yield self._cur_processor.set_subtask_result(subtask_result)
else:
logger.info(
"Could not set subtask %s result because current processor is "
"None, maybe the subtask is in the history task, so try to "
"use the history task processor.",
subtask_result.subtask_id,
)
task_processor = self._task_id_to_processor[subtask_result.task_id]
yield task_processor.set_subtask_result(subtask_result)

def is_done(self) -> bool:
for processor in self._task_id_to_processor.values():
if not processor.is_done():
return False
return True

def get_generation_order(self, task_id, stage_id):
return self._task_id_to_processor[task_id].get_generation_order(stage_id)

def get_subtask(self, task_id, chunk_data_key):
return self._task_id_to_processor[task_id].get_subtask(chunk_data_key)
1 change: 1 addition & 0 deletions mars/services/task/supervisor/tests/task_preprocessor.py
Original file line number Diff line number Diff line change
@@ -177,6 +177,7 @@ def analyze(
self._config,
chunk_to_subtasks,
shuffle_fetch_type=shuffle_fetch_type,
stage_id=stage_id,
)
subtask_graph = analyzer.gen_subtask_graph()
results = set(
26 changes: 26 additions & 0 deletions mars/services/tests/fault_injection_manager.py
Original file line number Diff line number Diff line change
@@ -96,3 +96,29 @@ async def create(cls, session_id, supervisor_address):
"""
session_api = await SessionAPI.create(supervisor_address)
await session_api.create_remote_object(session_id, cls.name, cls)


async def create_fault_injection_manager(
session_id, address, fault_count, fault_type, fault_op_types=None
):
class FaultInjectionManager(AbstractFaultInjectionManager):
def __init__(self):
self._fault_count = fault_count

def set_fault_count(self, count):
self._fault_count = count

def get_fault_count(self):
return self._fault_count

def get_fault(self, pos: FaultPosition, ctx=None) -> FaultType:
# Check op types if fault_op_types provided.
if fault_op_types and type(ctx.get("operand")) not in fault_op_types:
return FaultType.NoFault
if self._fault_count.get(pos, 0) > 0:
self._fault_count[pos] -= 1
return fault_type
return FaultType.NoFault

await FaultInjectionManager.create(session_id, address)
return FaultInjectionManager.name
19 changes: 0 additions & 19 deletions mars/storage/errors.py

This file was deleted.

2 changes: 1 addition & 1 deletion mars/storage/plasma.py
Original file line number Diff line number Diff line change
@@ -23,12 +23,12 @@
import psutil
import pyarrow as pa

from ..oscar.errors import DataNotExist
from ..resource import virtual_memory
from ..serialization import AioSerializer, AioDeserializer
from ..utils import implements, dataslots, calc_size_by_str, lazy_import
from .base import StorageBackend, StorageLevel, ObjectInfo, register_storage_backend
from .core import BufferWrappedFileObject, StorageFileObject
from .errors import DataNotExist

plasma = lazy_import("pyarrow.plasma", rename="plasma")
if sys.platform.startswith("win"):
14 changes: 12 additions & 2 deletions mars/storage/ray.py
Original file line number Diff line number Diff line change
@@ -13,10 +13,12 @@
# limitations under the License.

import inspect
import logging

from typing import Any, Dict, List, Tuple
from ..lib import sparse
from ..oscar.debug import debug_async_timeout
from ..oscar.errors import DataNotExist
from ..utils import (
lazy_import,
implements,
@@ -27,6 +29,7 @@
from .core import BufferWrappedFileObject, StorageFileObject

ray = lazy_import("ray")
logger = logging.getLogger(__name__)


# TODO(fyrestone): make the SparseMatrix pickleable.
@@ -207,8 +210,15 @@ async def get(self, object_id, **kwargs) -> object:
"Storage get object timeout, ObjectRef: %s",
object_id,
):
with record_time_cost_percentile(self._storage_get_metrics):
return await object_id
try:
with record_time_cost_percentile(self._storage_get_metrics):
return await object_id
except ray.exceptions.ObjectLostError:
logger.exception("ray.get failed.")
raise DataNotExist(f"Object {object_id} is lost due to node failure.")
except Exception:
logger.exception("ray.get failed.")
raise

@implements(StorageBackend.put)
async def put(self, obj, importance=0) -> ObjectInfo: