Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
[Kubeflow training service] Use Kubernete API server to replace kubec…
Browse files Browse the repository at this point in the history
…tl dependency (#472)

[Kubeflow training service] Use Kubernete API server to replace kubectl dependency
  • Loading branch information
yds05 authored Dec 13, 2018
1 parent 07e19a3 commit d8e5516
Show file tree
Hide file tree
Showing 17 changed files with 477 additions and 146 deletions.
2 changes: 2 additions & 0 deletions examples/trials/mnist-annotation/config_kubeflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ trial:
image: msranni/nni:latest
kubeflowConfig:
operator: tf-operator
apiVersion: v1alpha2
storage: nfs
nfs:
server: 10.10.10.10
path: /var/nfs/general
2 changes: 2 additions & 0 deletions examples/trials/mnist-distributed/config_kubeflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ trial:
image: msranni/nni:latest
kubeflowConfig:
operator: tf-operator
apiVersion: v1alpha2
storage: nfs
nfs:
# Your NFS server IP, like 10.10.10.10
server: {your_nfs_server_ip}
Expand Down
2 changes: 2 additions & 0 deletions examples/trials/mnist-smartparam/config_kubeflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ trial:
image: msranni/nni:latest
kubeflowConfig:
operator: tf-operator
apiVersion: v1alpha2
storage: nfs
nfs:
server: 10.10.10.10
path: /var/nfs/general
2 changes: 2 additions & 0 deletions examples/trials/mnist/config_kubeflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ trial:
image: msranni/nni:latest
kubeflowConfig:
operator: tf-operator
apiVersion: v1alpha2
storage: nfs
nfs:
server: 10.10.10.10
path: /var/nfs/general
17 changes: 17 additions & 0 deletions src/nni_manager/config/kubeflow/pytorchjob-crd-v1alpha2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"kind": "CustomResourceDefinition",
"spec": {
"scope": "Namespaced",
"version": "v1alpha2",
"group": "kubeflow.org",
"names": {
"kind": "PyTorchJob",
"plural": "pytorchjobs",
"singular": "pytorchjob"
}
},
"apiVersion": "apiextensions.k8s.io/v1beta1",
"metadata": {
"name": "pytorchjobs.kubeflow.org"
}
}
17 changes: 17 additions & 0 deletions src/nni_manager/config/kubeflow/pytorchjob-crd-v1beta1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"kind": "CustomResourceDefinition",
"spec": {
"scope": "Namespaced",
"version": "v1beta1",
"group": "kubeflow.org",
"names": {
"kind": "PyTorchJob",
"plural": "pytorchjobs",
"singular": "pytorchjob"
}
},
"apiVersion": "apiextensions.k8s.io/v1beta1",
"metadata": {
"name": "pytorchjobs.kubeflow.org"
}
}
17 changes: 17 additions & 0 deletions src/nni_manager/config/kubeflow/tfjob-crd-v1alpha2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"kind": "CustomResourceDefinition",
"spec": {
"scope": "Namespaced",
"version": "v1alpha2",
"group": "kubeflow.org",
"names": {
"kind": "TFJob",
"plural": "tfjobs",
"singular": "tfjob"
}
},
"apiVersion": "apiextensions.k8s.io/v1beta1",
"metadata": {
"name": "tfjobs.kubeflow.org"
}
}
17 changes: 17 additions & 0 deletions src/nni_manager/config/kubeflow/tfjob-crd-v1beta1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"kind": "CustomResourceDefinition",
"spec": {
"scope": "Namespaced",
"version": "v1beta1",
"group": "kubeflow.org",
"names": {
"kind": "TFJob",
"plural": "tfjobs",
"singular": "tfjob"
}
},
"apiVersion": "apiextensions.k8s.io/v1beta1",
"metadata": {
"name": "tfjobs.kubeflow.org"
}
}
6 changes: 3 additions & 3 deletions src/nni_manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"version": "1.0.0",
"main": "index.js",
"scripts": {
"postbuild": "cp -rf scripts ./dist/",
"postbuild": "cp -rf scripts ./dist/ && cp -rf config ./dist/",
"build": "tsc",
"test": "mocha -r ts-node/register -t 15000 --recursive **/*.test.ts --colors",
"start": "node dist/main.js"
Expand All @@ -15,7 +15,6 @@
"express": "^4.16.3",
"express-joi-validator": "^2.0.0",
"node-nvidia-smi": "^1.0.0",
"node-yaml": "^3.1.1",
"rx": "^4.1.0",
"sqlite3": "^4.0.2",
"ssh2": "^0.6.1",
Expand All @@ -26,7 +25,8 @@
"typescript-ioc": "^1.2.4",
"typescript-string-operations": "^1.3.1",
"webhdfs":"^1.2.0",
"azure-storage": "^2.10.2"
"azure-storage": "^2.10.2",
"kubernetes-client": "^6.5.0"
},
"devDependencies": {
"@types/chai": "^4.1.4",
Expand Down
1 change: 1 addition & 0 deletions src/nni_manager/rest_server/restValidationSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export namespace ValidationSchemas {
kubeflow_config: joi.object({
operator: joi.string().min(1).required(),
storage: joi.string().min(1),
apiVersion: joi.string().min(1),
nfs: joi.object({
server: joi.string().min(1).required(),
path: joi.string().min(1).required()
Expand Down
49 changes: 19 additions & 30 deletions src/nni_manager/training_service/kubeflow/kubeflowConfig.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { TrialConfig } from "../common/trialConfig";

/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
Expand All @@ -21,28 +19,11 @@ import { TrialConfig } from "../common/trialConfig";

'use strict';


/** operator types that kubeflow supported */
export type KubeflowOperator = 'tf-operator' | 'pytorch-operator' ;
export type KubeflowOperatorPlural = 'tfjobs' | 'pytorchjobs' ;
export type KubeflowOperatorJobKind = 'TFJob' | 'PyTorchJob';
export type KubeflowStorageKind = 'nfs' | 'azureStorage';

/**
* map from Kubeflow operator name to its plural name in K8S
*/
export const kubeflowOperatorMap : Map<KubeflowOperator, KubeflowOperatorPlural> = new Map<KubeflowOperator, KubeflowOperatorPlural>([
['tf-operator' , 'tfjobs'],
['pytorch-operator', 'pytorchjobs']
]);

/**
* map from Kubeflow operator name to its job kind name in K8S
*/
export const kubeflowOperatorJobKindMap : Map<KubeflowOperator, KubeflowOperatorJobKind> = new Map<KubeflowOperator, KubeflowOperatorJobKind>([
['tf-operator' , 'TFJob'],
['pytorch-operator', 'PyTorchJob']
]);
export type DistTrainRole = 'worker' | 'ps' | 'master';
export type OperatorApiVersion = 'v1alpha2' | 'v1beta1';

/**
* Kuberflow cluster configuration
Expand All @@ -51,25 +32,29 @@ export const kubeflowOperatorJobKindMap : Map<KubeflowOperator, KubeflowOperator
export class KubeflowClusterConfigBase {
/** Name of Kubeflow operator, like tf-operator */
public readonly operator: KubeflowOperator;
public readonly storage?: KubeflowStorageKind;
public readonly apiVersion: OperatorApiVersion;
public readonly storage?: KubeflowStorageKind;

/**
* Constructor
* @param userName User name of Kubeflow Cluster
* @param passWord password of Kubeflow Cluster
* @param host Host IP of Kubeflow Cluster
*/
constructor(operator: KubeflowOperator, storage?: KubeflowStorageKind) {
constructor(operator: KubeflowOperator, apiVersion: OperatorApiVersion, storage?: KubeflowStorageKind) {
this.operator = operator;
this.apiVersion = apiVersion;
this.storage = storage;
}
}

export class KubeflowClusterConfigNFS extends KubeflowClusterConfigBase{
public readonly nfs: NFSConfig;

constructor(operator: KubeflowOperator, nfs: NFSConfig, storage?: KubeflowStorageKind) {
super(operator, storage)
constructor(operator: KubeflowOperator,
apiVersion: OperatorApiVersion,
nfs: NFSConfig, storage?: KubeflowStorageKind) {
super(operator, apiVersion, storage);
this.nfs = nfs;
}
}
Expand All @@ -78,8 +63,12 @@ export class KubeflowClusterConfigAzure extends KubeflowClusterConfigBase{
public readonly keyVault: keyVaultConfig;
public readonly azureStorage: AzureStorage;

constructor(operator: KubeflowOperator, keyVault: keyVaultConfig, azureStorage: AzureStorage, storage?: KubeflowStorageKind) {
super(operator, storage)
constructor(operator: KubeflowOperator,
apiVersion: OperatorApiVersion,
keyVault: keyVaultConfig,
azureStorage: AzureStorage,
storage?: KubeflowStorageKind) {
super(operator, apiVersion, storage);
this.keyVault = keyVault;
this.azureStorage = azureStorage;
}
Expand Down Expand Up @@ -184,10 +173,10 @@ export class KubeflowTrialConfigTensorflow extends KubeflowTrialConfigBase{
}

export class KubeflowTrialConfigPytorch extends KubeflowTrialConfigBase{
public readonly master?: KubeflowTrialConfigTemplate;
public readonly worker: KubeflowTrialConfigTemplate;
public readonly master: KubeflowTrialConfigTemplate;
public readonly worker?: KubeflowTrialConfigTemplate;

constructor(codeDir: string, worker: KubeflowTrialConfigTemplate, master?: KubeflowTrialConfigTemplate) {
constructor(codeDir: string, master: KubeflowTrialConfigTemplate, worker?: KubeflowTrialConfigTemplate) {
super(codeDir);
this.master = master;
this.worker = worker;
Expand Down
6 changes: 2 additions & 4 deletions src/nni_manager/training_service/kubeflow/kubeflowData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ export class KubeflowTrialJobDetail implements TrialJobDetail {
public kubeflowJobName: string;
public sequenceId: number;
public queryJobFailedCount: number;
public k8sPluralName: string


constructor(id: string, status: TrialJobStatus, submitTime: number,
workingDirectory: string, form: JobApplicationForm,
kubeflowJobName: string, sequenceId: number, url: string, k8sPluralName: string) {
kubeflowJobName: string, sequenceId: number, url: string) {
this.id = id;
this.status = status;
this.submitTime = submitTime;
Expand All @@ -53,7 +52,6 @@ export class KubeflowTrialJobDetail implements TrialJobDetail {
this.tags = [];
this.queryJobFailedCount = 0;
this.url = url;
this.k8sPluralName = k8sPluralName;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

'use strict';

import * as assert from 'assert';
import * as cpp from 'child-process-promise';
import { getLogger, Logger } from '../../common/log';
import { KubeflowTrialJobDetail, KubeflowTFJobType} from './kubeflowData';
import { NNIError, NNIErrorNames } from '../../common/errors';
import { TrialJobStatus } from '../../common/trainingService';
import { KubeflowOperatorClient } from './kubernetesApiClient';

/**
* Collector Kubeflow jobs info from Kubernetes cluster, and update kubeflow job status locally
Expand All @@ -32,14 +34,14 @@ export class KubeflowJobInfoCollector {
private readonly trialJobsMap : Map<string, KubeflowTrialJobDetail>;
private readonly log: Logger = getLogger();
private readonly statusesNeedToCheck: TrialJobStatus[];
private readonly MAX_FAILED_QUERY_JOB_NUMBER: number = 30;

constructor(jobMap: Map<string, KubeflowTrialJobDetail>) {
this.trialJobsMap = jobMap;
this.statusesNeedToCheck = ['RUNNING', 'WAITING'];
}

public async retrieveTrialStatus() : Promise<void> {
public async retrieveTrialStatus(operatorClient: KubeflowOperatorClient | undefined) : Promise<void> {
assert(operatorClient !== undefined);
const updateKubeflowTrialJobs : Promise<void>[] = [];
for(let [trialJobId, kubeflowTrialJob] of this.trialJobsMap) {
if (!kubeflowTrialJob) {
Expand All @@ -49,33 +51,30 @@ export class KubeflowJobInfoCollector {
if( Date.now() - kubeflowTrialJob.submitTime < 20 * 1000) {
return Promise.resolve();
}
updateKubeflowTrialJobs.push(this.retrieveSingleTrialJobInfo(kubeflowTrialJob))
updateKubeflowTrialJobs.push(this.retrieveSingleTrialJobInfo(operatorClient, kubeflowTrialJob))
}

await Promise.all(updateKubeflowTrialJobs);
}

private async retrieveSingleTrialJobInfo(kubeflowTrialJob : KubeflowTrialJobDetail) : Promise<void> {
private async retrieveSingleTrialJobInfo(operatorClient: KubeflowOperatorClient | undefined,
kubeflowTrialJob : KubeflowTrialJobDetail) : Promise<void> {
if (!this.statusesNeedToCheck.includes(kubeflowTrialJob.status)) {
return Promise.resolve();
}

let result : cpp.childProcessPromise.Result;
if(operatorClient === undefined) {
return Promise.reject('operatorClient is undefined');
}

let kubeflowJobInfo: any;
try {
result = await cpp.exec(`kubectl get ${kubeflowTrialJob.k8sPluralName} ${kubeflowTrialJob.kubeflowJobName} -o json`);
if(result.stderr) {
this.log.error(`Get ${kubeflowTrialJob.k8sPluralName} ${kubeflowTrialJob.kubeflowJobName} failed. Error is ${result.stderr}, failed checking number is ${kubeflowTrialJob.queryJobFailedCount}`);
kubeflowTrialJob.queryJobFailedCount++;
if(kubeflowTrialJob.queryJobFailedCount >= this.MAX_FAILED_QUERY_JOB_NUMBER) {
kubeflowTrialJob.status = 'UNKNOWN';
}
}
kubeflowJobInfo = await operatorClient.getKubeflowJob(kubeflowTrialJob.kubeflowJobName);
} catch(error) {
this.log.error(`kubectl get ${kubeflowTrialJob.k8sPluralName} ${kubeflowTrialJob.kubeflowJobName} failed, error is ${error}`);
this.log.error(`Get job ${kubeflowTrialJob.kubeflowJobName} info failed, error is ${error}`);
return Promise.resolve();
}

const kubeflowJobInfo = JSON.parse(result.stdout);
if(kubeflowJobInfo.status && kubeflowJobInfo.status.conditions) {
const latestCondition = kubeflowJobInfo.status.conditions[kubeflowJobInfo.status.conditions.length - 1];
const tfJobType : KubeflowTFJobType = <KubeflowTFJobType>latestCondition.type;
Expand Down
Loading

0 comments on commit d8e5516

Please sign in to comment.