Skip to content

Commit

Permalink
Merge pull request #256 from squirrelsc/2391-improve
Browse files Browse the repository at this point in the history
Some small fix and merge logic
  • Loading branch information
SparkSnail authored Jun 30, 2020
2 parents fec8a67 + 478629f commit c299ce1
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@

'use strict';

import * as component from "../../../common/component";
import { EventEmitter } from 'events';
import { delay } from "../../../common/utils";
import { CommandChannel, RunnerConnection } from "../commandChannel";
import { EnvironmentInformation, Channel } from "../environment";
import { AMLEnvironmentInformation } from '../aml/amlConfig';
import { EventEmitter } from 'events';
import { AMLEnvironmentService } from "../environments/amlEnvironmentService";
import { STDOUT } from "../../../core/commands";
import { CommandChannel, RunnerConnection } from "../commandChannel";
import { Channel, EnvironmentInformation } from "../environment";

class AMLRunnerConnection extends RunnerConnection {
}
Expand All @@ -19,7 +16,6 @@ export class AMLCommandChannel extends CommandChannel {
private stopping: boolean = false;
private currentMessageIndex: number = -1;
private sendQueues: [EnvironmentInformation, string][] = [];
private metricEmitter: EventEmitter | undefined;
private readonly NNI_METRICS_PATTERN: string = `NNISDK_MEb'(?<metrics>.*?)'`;

public constructor(commandEmitter: EventEmitter) {
Expand All @@ -30,23 +26,25 @@ export class AMLCommandChannel extends CommandChannel {
}

public async config(_key: string, _value: any): Promise<void> {
switch (_key) {
case "MetricEmitter":
this.metricEmitter = _value as EventEmitter;
break;
}
// do nothing
}

public async start(): Promise<void> {
// start command loops
this.receiveLoop();
this.sendLoop();
// do nothing
}

public async stop(): Promise<void> {
this.stopping = true;
}

public async run(): Promise<void> {
// start command loops
await Promise.all([
this.receiveLoop(),
this.sendLoop()
]);
}

protected async sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void> {
this.sendQueues.push([environment, message]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import * as component from "../../../common/component";
import { delay } from "../../../common/utils";
import { CommandChannel, RunnerConnection } from "../commandChannel";
import { EnvironmentInformation, Channel } from "../environment";
import { Channel, EnvironmentInformation } from "../environment";
import { StorageService } from "../storageService";

class FileHandler {
Expand Down Expand Up @@ -38,15 +38,21 @@ export class FileCommandChannel extends CommandChannel {
}

public async start(): Promise<void> {
// start command loops
this.receiveLoop();
this.sendLoop();
// do nothing
}

public async stop(): Promise<void> {
this.stopping = true;
}

public async run(): Promise<void> {
// start command loops
await Promise.all([
this.receiveLoop(),
this.sendLoop()
]);
}

protected async sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void> {
this.sendQueues.push([environment, message]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ export class WebCommandChannel extends CommandChannel {
}
}

public async run(): Promise<void>{
// do nothing
}

protected async sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void> {
if (this.webSocketServer === undefined) {
throw new Error(`WebCommandChannel: uninitialized!`)
Expand Down
3 changes: 3 additions & 0 deletions src/nni_manager/training_service/reusable/commandChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ export abstract class CommandChannel {
public abstract start(): Promise<void>;
public abstract stop(): Promise<void>;

// Pull-based command channels need loop to check messages, the loop should be started with await here.
public abstract run(): Promise<void>;

protected abstract sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void>;
protected abstract createRunnerConnection(environment: EnvironmentInformation): RunnerConnection;

Expand Down
36 changes: 18 additions & 18 deletions src/nni_manager/training_service/reusable/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,6 @@ import { CommandChannel } from "./commandChannel";
export type EnvironmentStatus = 'UNKNOWN' | 'WAITING' | 'RUNNING' | 'SUCCEEDED' | 'FAILED' | 'USER_CANCELED';
export type Channel = "web" | "file" | "aml" | "ut";

export abstract class EnvironmentService {

public abstract get hasStorageService(): boolean;

public abstract config(key: string, value: string): Promise<void>;
public abstract refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void>;
public abstract startEnvironment(environment: EnvironmentInformation): Promise<void>;
public abstract stopEnvironment(environment: EnvironmentInformation): Promise<void>;

public getCommandChannel(commandEmitter: EventEmitter): CommandChannel {
return new WebCommandChannel(commandEmitter);
}

public createEnviornmentInfomation(envId: string, envName: string): EnvironmentInformation {
return new EnvironmentInformation(envId, envName);
}
}

export class NodeInfomation {
public id: string;
public status: TrialJobStatus = "UNKNOWN";
Expand Down Expand Up @@ -110,3 +92,21 @@ export class EnvironmentInformation {
}
}
}

export abstract class EnvironmentService {

public abstract get hasStorageService(): boolean;

public abstract config(key: string, value: string): Promise<void>;
public abstract refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void>;
public abstract startEnvironment(environment: EnvironmentInformation): Promise<void>;
public abstract stopEnvironment(environment: EnvironmentInformation): Promise<void>;

public getCommandChannel(commandEmitter: EventEmitter): CommandChannel {
return new WebCommandChannel(commandEmitter);
}

public createEnviornmentInfomation(envId: string, envName: string): EnvironmentInformation {
return new EnvironmentInformation(envId, envName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ export class OpenPaiEnvironmentService extends EnvironmentService {
}

// Step 1. Prepare PAI job configuration
environment.runnerWorkingFolder = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/envs/${environment.id}`;
environment.command = `cd ${environment.runnerWorkingFolder} && ${environment.command}`
const environmentRoot = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}`;
environment.runnerWorkingFolder = `${environmentRoot}/envs/${environment.id}`;
environment.command = `cd ${environmentRoot} && ${environment.command}`
environment.trackingUrl = `${this.protocol}://${this.paiClusterConfig.host}/job-detail.html?username=${this.paiClusterConfig.userName}&jobName=${environment.jobId}`

// Step 2. Generate Job Configuration in yaml format
Expand Down
80 changes: 36 additions & 44 deletions src/nni_manager/training_service/reusable/trialDispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import * as path from 'path';
import { Writable } from 'stream';
import { String } from 'typescript-string-operations';
import * as component from '../../common/component';
import { getExperimentId, getPlatform, getBasePort } from '../../common/experimentStartupInfo';
import { getBasePort, getExperimentId, getPlatform } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
import { NNIManagerIpConfig, TrainingService, TrialJobApplicationForm, TrialJobMetric, TrialJobStatus } from '../../common/trainingService';
import { delay, getExperimentRootDir, getLogLevel, getVersion, mkDirPSync, uniqueString } from '../../common/utils';
Expand All @@ -18,11 +18,10 @@ import { GPUSummary } from '../../training_service/common/gpuData';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { validateCodeDir, execMkdir, execCopydir, tarAdd } from '../common/util';
import { WebCommandChannel } from './channels/webCommandChannel';
import { AMLCommandChannel } from './channels/amlCommandChannel';
import { validateCodeDir } from '../common/util';
import { Command, CommandChannel } from './commandChannel';
import { EnvironmentInformation, EnvironmentService, NodeInfomation, RunnerSettings } from './environment';
import { MountedStorageService } from './storages/mountedStorageService';
import { StorageService } from './storageService';
import { TrialDetail } from './trial';

Expand Down Expand Up @@ -169,59 +168,58 @@ class TrialDispatcher implements TrainingService {
this.runnerSettings.commandChannel = this.commandChannel.channelName;

// for AML channel, other channels can ignore this.
this.commandChannel.config("MetricEmitter", this.metricsEmitter);
await this.commandChannel.config("MetricEmitter", this.metricsEmitter);

// start channel
this.commandEmitter.on("command", (command: Command): void => {
this.handleCommand(command).catch((err: Error) => {
this.log.error(`TrialDispatcher: error on handle env ${command.environment.id} command: ${command.command}, data: ${command.data}, error: ${err}`);
})
});
this.commandChannel.start();
await this.commandChannel.start();
this.log.info(`TrialDispatcher: started channel: ${this.commandChannel.constructor.name}`);

if (this.trialConfig === undefined) {
throw new Error(`trial config shouldn't be undefined in run()`);
}

this.log.info(`TrialDispatcher: copying code and settings.`);
let storageService: StorageService;
if (environmentService.hasStorageService) {
this.log.info(`TrialDispatcher: copying code and settings.`);
const storageService = component.get<StorageService>(StorageService);
// Copy the compressed file to remoteDirectory and delete it
const codeDir = path.resolve(this.trialConfig.codeDir);
const envDir = storageService.joinPath("envs");
const codeFileName = await storageService.copyDirectory(codeDir, envDir, true);
storageService.rename(codeFileName, "nni-code.tar.gz");

const installFileName = storageService.joinPath(envDir, 'install_nni.sh');
await storageService.save(CONTAINER_INSTALL_NNI_SHELL_FORMAT, installFileName);

const runnerSettings = storageService.joinPath(envDir, "settings.json");
await storageService.save(JSON.stringify(this.runnerSettings), runnerSettings);

if (this.isDeveloping) {
let trialToolsPath = path.join(__dirname, "../../../../../tools/nni_trial_tool");
if (false === fs.existsSync(trialToolsPath)) {
trialToolsPath = path.join(__dirname, "..\\..\\..\\..\\..\\tools\\nni_trial_tool");
}
await storageService.copyDirectory(trialToolsPath, envDir, true);
}
this.log.debug(`TrialDispatcher: use existing storage service.`);
storageService = component.get<StorageService>(StorageService);
} else {
//write configuration to local folder, for AML
let environmentLocalTempFolder = path.join(this.experimentRootDir, this.experimentId, "environment-temp", "envs");
await execMkdir(environmentLocalTempFolder);
const runnerSettingsPath = path.join(environmentLocalTempFolder, "settings.json");
this.runnerSettings.command = this.trialConfig.command;
await fs.promises.writeFile(runnerSettingsPath, JSON.stringify(this.runnerSettings), { encoding: 'utf8' });
const installFilePath = path.join(environmentLocalTempFolder, "install_nni.sh");
await fs.promises.writeFile(installFilePath, CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
await tarAdd(path.join(environmentLocalTempFolder, 'nni-code.tar.gz'), this.trialConfig.codeDir);
this.log.debug(`TrialDispatcher: create temp storage service to temp folder.`);
storageService = new MountedStorageService();
const environmentLocalTempFolder = path.join(this.experimentRootDir, this.experimentId, "environment-temp");
storageService.initialize(this.trialConfig.codeDir, environmentLocalTempFolder);
}

// Copy the compressed file to remoteDirectory and delete it
const codeDir = path.resolve(this.trialConfig.codeDir);
const envDir = storageService.joinPath("envs");
const codeFileName = await storageService.copyDirectory(codeDir, envDir, true);
storageService.rename(codeFileName, "nni-code.tar.gz");

const installFileName = storageService.joinPath(envDir, 'install_nni.sh');
await storageService.save(CONTAINER_INSTALL_NNI_SHELL_FORMAT, installFileName);

const runnerSettings = storageService.joinPath(envDir, "settings.json");
await storageService.save(JSON.stringify(this.runnerSettings), runnerSettings);

if (this.isDeveloping) {
let trialToolsPath = path.join(__dirname, "../../../../../tools/nni_trial_tool");
if (false === fs.existsSync(trialToolsPath)) {
trialToolsPath = path.join(__dirname, "..\\..\\..\\..\\..\\tools\\nni_trial_tool");
}
await storageService.copyDirectory(trialToolsPath, envDir, true);
}

this.log.info(`TrialDispatcher: run loop started.`);
await Promise.all([
this.environmentMaintenanceLoop(),
this.trialManagementLoop(),
this.commandChannel.run(),
]);
}

Expand Down Expand Up @@ -288,7 +286,7 @@ class TrialDispatcher implements TrainingService {
}

this.commandEmitter.off("command", this.handleCommand);
this.commandChannel.stop();
await this.commandChannel.stop();
}

private async environmentMaintenanceLoop(): Promise<void> {
Expand Down Expand Up @@ -454,13 +452,7 @@ class TrialDispatcher implements TrainingService {
environment.command = "[ -d \"nni_trial_tool\" ] && echo \"nni_trial_tool exists already\" || (mkdir ./nni_trial_tool && tar -xof ../nni_trial_tool.tar.gz -C ./nni_trial_tool) && pip3 install websockets && " + environment.command;
}

if (environmentService.hasStorageService) {
const storageService = component.get<StorageService>(StorageService);
environment.workingFolder = storageService.joinPath("envs", envId);
await storageService.createDirectory(environment.workingFolder);
} else {
environment.command = `mkdir envs/${envId} && cd envs/${envId} && ${environment.command}`;
}
environment.command = `mkdir -p envs/${envId} && cd envs/${envId} && ${environment.command}`;

await environmentService.startEnvironment(environment);
this.environments.set(environment.id, environment);
Expand Down

0 comments on commit c299ce1

Please sign in to comment.