Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
[RestServer, Webportal, Runtime] Change tensorboard to use runtime pl…
Browse files Browse the repository at this point in the history
…ugin (#4105)

Change tensorboard to use runtime plugin
Example
```
    - plugin: tensorboard
      parameters:
        port: 13633
        logdir:
          path: /mnt/tensorboard
```
Binyang2014 authored Jan 10, 2020
1 parent ce5f346 commit 4e102ec
Showing 17 changed files with 233 additions and 299 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/continuous-integration.yml
Original file line number Diff line number Diff line change
@@ -89,7 +89,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install kubernetes pyyaml requests pylint
pip install kubernetes pyyaml requests jinja2 pylint
- name: Lint kube-runtime
run: |
cd src/kube-runtime
2 changes: 1 addition & 1 deletion src/kube-runtime/build/kube-runtime.dockerfile
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ RUN ${PROJECT_DIR}/build/runtime/go-build.sh && \

FROM python:3.7-alpine

RUN pip install kubernetes pyyaml requests
RUN pip install kubernetes pyyaml requests jinja2

ENV INSTALL_DIR=/opt/kube-runtime
ARG BARRIER_DIR=/opt/frameworkcontroller/frameworkbarrier
2 changes: 1 addition & 1 deletion src/kube-runtime/src/init.d/image_checker.py
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ def _get_docker_repository_name(image_name):
def _is_docker_image_valid(job_config):
prerequisites = job_config["prerequisites"]

task_role_name = os.getenv("FC_TASKROLE_NAME")
task_role_name = os.getenv("PAI_CURRENT_TASK_ROLE_NAME")
task_role = job_config["taskRoles"][task_role_name]
docker_image_name = task_role["dockerImage"]

33 changes: 28 additions & 5 deletions src/kube-runtime/src/plugins/tensorboard/init.py
Original file line number Diff line number Diff line change
@@ -20,27 +20,50 @@
import os
import sys

from jinja2 import Template

sys.path.append(
os.path.join(os.path.dirname(os.path.abspath(__file__)), "../.."))
from plugins.plugin_utils import plugin_init, PluginHelper #pylint: disable=wrong-import-position

LOGGER = logging.getLogger(__name__)
TASK_ROLE_NAME = os.getenv("PAI_CURRENT_TASK_ROLE_NAME")
TASK_ROLE_LIST = os.getenv("PAI_TASK_ROLE_LIST").split(",")
TASK_ROLE_INDEX = int(os.getenv("PAI_CURRENT_TASK_ROLE_CURRENT_TASK_INDEX"))


def generate_tensorboard_commands(template_file, parameters):
logdir = ",".join(
["{}:{}".format(k, v) for k, v in parameters["logdir"].items()])
with open(template_file) as f:
template = Template(f.read())
return template.render(logdir=logdir, port=parameters["port"])


def main():
LOGGER.info("Preparing tensorboard runtime plugin commands")

[plugin_config, pre_script, _] = plugin_init()
parameters = plugin_config.get("parameters")

if TASK_ROLE_LIST[0] != TASK_ROLE_NAME or TASK_ROLE_INDEX != 0:
LOGGER.info(
"Not first taskrole or not first task instance, ignore this plugin"
)
return
if not parameters:
LOGGER.info("Tensorboard plugin parameters is empty, ignore this")
return

commands = []
logdir = ",".join(
["{}:{}".format(k, v) for k, v in parameters["logdir"].items()])
commands.append("tensorboard --logdir={} --port={} &\n".format(
logdir, parameters["port"]))
current_dir = os.path.dirname(os.path.abspath(__file__))
template_file = "{}/tensorboard.sh.template".format(current_dir)
with open("{}/tensorboard.sh".format(current_dir), "w+") as f:
f.write(generate_tensorboard_commands(template_file, parameters))

tensorboard_exec_path = "{}/tensorboard.sh".format(current_dir)
commands = [
"chmod u+x {}".format(tensorboard_exec_path), tensorboard_exec_path
]

PluginHelper(plugin_config).inject_commands(commands, pre_script)
LOGGER.info("Tensorboard runtime plugin perpared")
31 changes: 31 additions & 0 deletions src/kube-runtime/src/plugins/tensorboard/tensorboard.sh.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/bash
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

set -o errexit
set -o nounset
set -o pipefail

TENSORFLOW_VERSION=$(python -c 'import tensorflow as tf; print(tf.__version__)')
MAJOR_VERSION=${TENSORFLOW_VERSION:0:1}
if [[ "$MAJOR_VERSION" = "1" ]]; then
tensorboard --logdir={{ logdir }} --port={{ port }} &
elif [[ "$MAJOR_VERSION" = "2" ]]; then
tensorboard --logdir={{ logdir }} --port={{ port }} --bind_all &
else
echo "Tensorflow version is ${TENSORFLOW_VERSION}, not support"
fi
4 changes: 2 additions & 2 deletions src/kube-runtime/test/test_image_checker.py
Original file line number Diff line number Diff line change
@@ -40,12 +40,12 @@ def prepare_image_check(job_config_path):
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
os.environ["FC_TASKROLE_NAME"] = "worker"
os.environ["PAI_CURRENT_TASK_ROLE_NAME"] = "worker"
if os.path.exists(job_config_path):
with open(job_config_path, 'r') as f:
self.config = yaml.load(f, Loader=yaml.FullLoader)
func(self, *args, **kwargs)
del os.environ["FC_TASKROLE_NAME"]
del os.environ["PAI_CURRENT_TASK_ROLE_NAME"]

return wrapper

80 changes: 38 additions & 42 deletions src/rest-server/src/models/v2/job/k8s.js
Original file line number Diff line number Diff line change
@@ -337,7 +337,7 @@ const convertFrameworkDetail = async (framework) => {
return detail;
};

const generateTaskRole = (frameworkName, taskRole, jobInfo, config, storageConfig) => {
const generateTaskRole = (frameworkName, taskRole, jobInfo, frameworkEnvList, config, storageConfig) => {
const ports = config.taskRoles[taskRole].resourcePerInstance.ports || {};
for (let port of ['ssh', 'http']) {
if (!(port in ports)) {
@@ -372,6 +372,21 @@ const generateTaskRole = (frameworkName, taskRole, jobInfo, config, storageConfi
retryPolicy.maxRetryCount = config.taskRoles[taskRole].taskRetryCount || 0;
}

const taskRoleEnvList = [
{
name: 'PAI_CURRENT_TASK_ROLE_NAME',
value: taskRole,
},
{
name: 'PAI_CURRENT_TASK_ROLE_CURRENT_TASK_INDEX',
valueFrom: {
fieldRef: {
fieldPath: `metadata.annotations['FC_TASK_INDEX']`,
},
},
},
];

const frameworkTaskRole = {
name: convertName(taskRole),
taskNumber: config.taskRoles[taskRole].instances || 1,
@@ -412,18 +427,12 @@ const generateTaskRole = (frameworkName, taskRole, jobInfo, config, storageConfi
name: 'GANG_ALLOCATION',
value: gangAllocation,
},
{
name: 'PAI_USER_NAME',
value: jobInfo.userName,
},
{
name: 'PAI_JOB_NAME',
value: `${jobInfo.userName}~${jobInfo.jobName}`,
},
{
name: 'STORAGE_CONFIGS',
value: JSON.stringify(storageConfig),
},
...frameworkEnvList,
...taskRoleEnvList,
],
volumeMounts: [
{
@@ -457,7 +466,19 @@ const generateTaskRole = (frameworkName, taskRole, jobInfo, config, storageConfi
...infinibandDevice && {'rdma/hca': 1},
},
},
env: [],
env: [
...frameworkEnvList,
...taskRoleEnvList,
// backward compatibility
{
name: 'PAI_TASK_INDEX',
valueFrom: {
fieldRef: {
fieldPath: `metadata.annotations['FC_TASK_INDEX']`,
},
},
},
],
securityContext: {
capabilities: {
add: ['SYS_ADMIN', 'IPC_LOCK', 'DAC_READ_SEARCH'],
@@ -622,44 +643,19 @@ const generateFrameworkDescription = (frameworkName, virtualCluster, config, raw
taskRoles: [],
},
};
// generate runtime env
const env = runtimeEnv.generateFrameworkEnv(frameworkName, config);
const envlist = Object.keys(env).map((name) => {
return {name, value: `${env[name]}`};

// generate framework env
const frameworkEnv = runtimeEnv.generateFrameworkEnv(frameworkName, config);
const frameworkEnvList = Object.keys(frameworkEnv).map((name) => {
return {name, value: `${frameworkEnv[name]}`};
});

// fill in task roles
let totalGpuNumber = 0;
for (let taskRole of Object.keys(config.taskRoles)) {
totalGpuNumber += config.taskRoles[taskRole].resourcePerInstance.gpu * config.taskRoles[taskRole].instances;
const taskRoleDescription = generateTaskRole(frameworkName, taskRole, jobInfo, config, storageConfig);
const taskRoleDescription = generateTaskRole(frameworkName, taskRole, jobInfo, frameworkEnvList, config, storageConfig);
taskRoleDescription.task.pod.spec.priorityClassName = `${encodeName(frameworkName)}-priority`;
taskRoleDescription.task.pod.spec.containers[0].env.push(...envlist.concat([
{
name: 'PAI_CURRENT_TASK_ROLE_NAME',
valueFrom: {
fieldRef: {
fieldPath: `metadata.annotations['FC_TASKROLE_NAME']`,
},
},
},
{
name: 'PAI_CURRENT_TASK_ROLE_CURRENT_TASK_INDEX',
valueFrom: {
fieldRef: {
fieldPath: `metadata.annotations['FC_TASK_INDEX']`,
},
},
},
// backward compatibility
{
name: 'PAI_TASK_INDEX',
valueFrom: {
fieldRef: {
fieldPath: `metadata.annotations['FC_TASK_INDEX']`,
},
},
},
]));
frameworkDescription.spec.taskRoles.push(taskRoleDescription);
}
frameworkDescription.metadata.annotations.totalGpuNumber = `${totalGpuNumber}`;
Original file line number Diff line number Diff line change
@@ -44,9 +44,8 @@ import { submitJob } from '../utils/conn';
import MonacoPanel from '../../components/monaco-panel';
import Card from '../../components/card';
import {
populateProtocolWithDataAndTensorboard,
populateProtocolWithData,
getJobComponentsFromConfig,
isValidUpdatedTensorBoardExtras,
} from '../utils/utils';
import Context from './context';
import { FormShortSection } from './form-page';
@@ -120,7 +119,7 @@ export const SubmissionSection = props => {
);
_protocolAndErrorUpdate(protocol);
try {
await populateProtocolWithDataAndTensorboard(user, protocol, jobData);
await populateProtocolWithData(user, protocol, jobData);
setProtocolYaml(protocol.toYaml());
} catch (err) {
alert(err);
@@ -146,18 +145,6 @@ export const SubmissionSection = props => {
updatedExtras,
] = getJobComponentsFromConfig(updatedJob, { vcNames });

if (extras.tensorBoard) {
const updatedTensorBoardExtras = updatedExtras.tensorBoard || {};
if (
!isValidUpdatedTensorBoardExtras(
extras.tensorBoard,
updatedTensorBoardExtras,
)
) {
updatedExtras.tensorBoard = extras.tensorBoard;
}
}

onChange(
updatedJobInformation,
updatedTaskRoles,
@@ -199,7 +186,7 @@ export const SubmissionSection = props => {
event.preventDefault();
const protocol = cloneDeep(jobProtocol);
try {
await populateProtocolWithDataAndTensorboard(user, protocol, jobData);
await populateProtocolWithData(user, protocol, jobData);
await submitJob(protocol.toYaml());
window.location.href = `/job-detail.html?username=${user}&jobName=${protocol.name}`;
} catch (err) {
Loading

0 comments on commit 4e102ec

Please sign in to comment.