Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azure: add worker plugin for spot instances #251

Merged
merged 2 commits into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dask_cloudprovider/azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .azurevm import AzureVMCluster
from .utils import AzurePreemptibleWorkerPlugin
178 changes: 178 additions & 0 deletions dask_cloudprovider/azure/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import asyncio
import datetime
import logging

import aiohttp
from distributed.diagnostics.plugin import WorkerPlugin
from tornado.ioloop import IOLoop, PeriodicCallback


logger = logging.getLogger(__name__)


class AzurePreemptibleWorkerPlugin(WorkerPlugin):
"""A worker plugin for azure spot instances

This worker plugin will poll azure's metadata service for preemption notifications.
When a node is preempted, the plugin will attempt to shutdown gracefully all workers
on the node.

jacobtomlinson marked this conversation as resolved.
Show resolved Hide resolved
For more details on azure spot instances see:
https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events

Parameters
----------
poll_interval_s: int (optional)
The rate at which the plugin will poll the metadata service in seconds.

Defaults to ``1``

metadata_url: str (optional)
The url of the metadata service to poll.

Defaults to "http://169.254.169.254/metadata/scheduledevents?api-version=2019-08-01"

termination_events: List[str] (optional)
The type of events that will trigger the gracefull shutdown

Defaults to ``['Preempt', 'Terminate']``

termination_offset_minutes: int (optional)
Extra offset to apply to the premption date. This may be negative, to start
the gracefull shutdown before the ``NotBefore`` date. It can also be positive, to
start the shutdown after the ``NotBefore`` date, but this is at your own risk.

Defaults to ``0``

Examples
--------

Let's say you have cluster and a client instance. For example using ``KubeCluster``
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a full reference to span the two sets of docs. Will need to add dask-kubernetes to the intersphinx mapping.

intersphinx_mapping = {
"python": ("https://docs.python.org/3", None),
"dask": ("https://docs.dask.org/en/latest/", None),
"distributed": ("https://distributed.dask.org/en/latest/", None),
}

Suggested change
Let's say you have cluster and a client instance. For example using ``KubeCluster``
Let's say you have cluster and a client instance. For example using :class:`dask_kubernetes.KubeCluster`


>>> from dask_kubernetes import KubeCluster
>>> from distributed import Client
>>> cluster = KubeCluster()
>>> client = Client(cluster)

You can add the worker plugin using the following:
>>> from dask_cloudprovider.azure import AzurePreemptibleWorkerPlugin
>>> client.register_worker_plugin(AzurePreemptibleWorkerPlugin())
"""

def __init__(
self,
poll_interval_s=1,
metadata_url=None,
termination_events=None,
termination_offset_minutes=0,
):
self.callback = None
self.loop = None
self.worker = None
self.poll_interval_s = poll_interval_s
self.metadata_url = (
metadata_url
or "http://169.254.169.254/metadata/scheduledevents?api-version=2019-08-01"
Copy link
Member

@jacobtomlinson jacobtomlinson Jan 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This URL should probably be a constant defined at the top of the file.

)
self.termination_events = termination_events or ["Preempt", "Terminate"]
self.termination_offset = datetime.timedelta(minutes=termination_offset_minutes)

self.terminating = False
self.not_before = None
self._session = None
self._lock = None

async def _is_terminating(self):
preempt_started = False
async with self._session.get(self.metadata_url) as response:
try:
data = await response.json()
# Sometime azure responds with text/plain mime type
except aiohttp.ContentTypeError:
return
# Sometimes the response doesn't contain the Events key
events = data.get("Events", [])
if events:
logger.debug(
"Worker {}, got metadata events {}".format(self.worker.name, events)
)
for evt in events:
event_type = evt["EventType"]
if event_type not in self.termination_events:
continue

event_status = evt.get("EventStatus")
if event_status == "Started":
logger.info(
"Worker {}, node preemption started".format(self.worker.name)
)
preempt_started = True
break

not_before = evt.get("NotBefore")
if not not_before:
continue

not_before = datetime.datetime.strptime(
not_before, "%a, %d %b %Y %H:%M:%S GMT"
)
if self.not_before is None:
logger.info(
"Worker {}, node deletion scheduled not before {}".format(
self.worker.name, self.not_before
)
)
self.not_before = not_before
break
if self.not_before < not_before:
logger.info(
"Worker {}, node deletion re-scheduled not before {}".format(
self.worker.name, not_before
)
)
self.not_before = not_before
break

return preempt_started or (
self.not_before
and (self.not_before + self.termination_offset < datetime.datetime.utcnow())
)

async def poll_status(self):
if self.terminating:
return
if self._session is None:
self._session = aiohttp.ClientSession(headers={"Metadata": "true"})
if self._lock is None:
self._lock = asyncio.Lock()

async with self._lock:
is_terminating = await self._is_terminating()
if not is_terminating:
return

logger.info(
"Worker {}, node is being deleted, attempting graceful shutdown".format(
self.worker.name
)
)
self.terminating = True
await self._session.close()
await self.worker.close_gracefully()

def setup(self, worker):
self.worker = worker
self.loop = IOLoop.current()
self.callback = PeriodicCallback(
self.poll_status, callback_time=self.poll_interval_s * 1_000
)
self.loop.add_callback(self.callback.start)
logger.debug(
"Worker {}, registering preemptible plugin".format(self.worker.name)
)

def teardown(self, worker):
logger.debug("Worker {}, tearing down plugin".format(self.worker.name))
if self.callback:
self.callback.stop()
self.callback = None
8 changes: 7 additions & 1 deletion doc/source/azure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,10 @@ AzureVM
-------

.. autoclass:: AzureVMCluster
:members:
:members:

Azure Spot Instance Plugin
--------------------------

.. autoclass:: AzurePreemptibleWorkerPlugin
:members:
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
aiohttp>=3.7.3
dask>=2021.01.1
distributed>=2021.01.1
jinja2
tornado >= 5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tornado >= 5
tornado>=5