Skip to content

Commit

Permalink
Fix keyframe bug (#64)
Browse files Browse the repository at this point in the history
* fix when log not created, try to tail the log can crash the server

* fix vertex error log not write to file, and keyframe not work for large video, failed metadata operation not correctly mark the job
  • Loading branch information
EverettSummer authored Oct 28, 2023
1 parent 3e757bd commit 299169a
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 83 deletions.
38 changes: 21 additions & 17 deletions src/JobManager/JobManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,8 @@ export class JobManager {
}
});

this._vm.events.on(EVENT_VERTEX_FAIL, async (error) => {
try {
this._jobLogger.error(error);
this._job.status = JobStatus.UnrecoverableError;
this._job = await jobRepo.save(this._job) as Job;
this.events.emit(JobManager.EVENT_JOB_FAILED, this._job.id);
this._jobLogger.error('Job failed with vertex failure');
this._jobLogger.info(LOG_END_FLAG);
} catch (err) {
this._jobLogger.error(err);
this._jobLogger.info(LOG_END_FLAG);
this._sentry.capture(err);
this.events.emit(JobManager.EVENT_JOB_FAILED, this._job.id);
}
this._vm.events.on(EVENT_VERTEX_FAIL, (error) => {
this.onVxFailed(error);
});

this._vm.events.on(TERMINAL_VERTEX_FINISHED, async () => {
Expand All @@ -125,10 +113,9 @@ export class JobManager {
this._jobLogger.info(LOG_END_FLAG);
}
} catch (error) {
this._jobLogger.error(error);
this._jobLogger.info(LOG_END_FLAG);
this._sentry.capture(error);
this.events.emit(JobManager.EVENT_JOB_FAILED, this._job.id);
await this.onVxFailed(error);
this._jobLogger.error(error);
}
});

Expand All @@ -139,6 +126,23 @@ export class JobManager {
}
}

private async onVxFailed(error:any): Promise<any> {
const jobRepo = this._databaseService.getJobRepository();
try {
this._jobLogger.error(error);
this._job.status = JobStatus.UnrecoverableError;
this._job = await jobRepo.save(this._job) as Job;
this.events.emit(JobManager.EVENT_JOB_FAILED, this._job.id);
this._jobLogger.error('Job failed with vertex failure');
this._jobLogger.info(LOG_END_FLAG);
} catch (err) {
this._jobLogger.error(err);
this._jobLogger.info(LOG_END_FLAG);
this._sentry.capture(err);
this.events.emit(JobManager.EVENT_JOB_FAILED, this._job.id);
}
}

/**
* Cancel current running job if possible
*/
Expand Down
53 changes: 24 additions & 29 deletions src/JobManager/JobMetadataHelperImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import { JobMetadataHelper } from './JobMetadataHelper';
import { readdir } from 'fs/promises';
import { StringDecoder } from 'string_decoder';

const COMMAND_TIMEOUT = 5000;
const COMMAND_TIMEOUT = 30 * 60 * 1000;

const TILE_SIZE = 10; // fixed tile size to avoid large image
const SCALE_HEIGHT = 120;
Expand Down Expand Up @@ -60,30 +60,22 @@ export class JobMetadataHelperImpl implements JobMetadataHelper {
throw new Error('No video output found!');
}
const metadata = new VideoOutputMetadata();
try {
const outputPath = videoVertex.outputPath;
const trackInfos = await getStreamsInfo(outputPath);
const container = new MediaContainer(trackInfos);
const videoStream = new VideoStream(container.getDefaultVideoStreamInfo());
const thumbnailPath = join(dirname(outputPath), `thumb-${basename(outputPath)}.png`);
jobLogger.info(`Generating thumbnail of the video at 00:00:01.000, output is ${thumbnailPath}`);
await this.runCommand('ffmpeg', ['-y','-ss', '00:00:01.000', '-i', outputPath, '-vframes','1', thumbnailPath], jobLogger);
jobLogger.info(`Thumbnail generated, getting dominant color of the thumbnail`);
const dominantColor = await getAverageColor(thumbnailPath, {algorithm: 'dominant'});
metadata.width = videoStream.getWidth();
metadata.height = videoStream.getHeight();
metadata.duration = container.getDuration() * 1000;
metadata.dominantColorOfThumbnail = dominantColor.hex;
metadata.thumbnailPath = thumbnailPath;
jobLogger.info('Generating keyframes preview tile');
await this.generatePreviewImage(outputPath, metadata, jobLogger);
} catch (ex) {
jobLogger.error(ex);
this._sentry.capture(ex);
}
if (!metadata) {
throw new Error('no metadata for this job, check the job logs for more information');
}
const outputPath = videoVertex.outputPath;
const trackInfos = await getStreamsInfo(outputPath);
const container = new MediaContainer(trackInfos);
const videoStream = new VideoStream(container.getDefaultVideoStreamInfo());
const thumbnailPath = join(dirname(outputPath), `thumb-${basename(outputPath)}.png`);
jobLogger.info(`Generating thumbnail of the video at 00:00:01.000, output is ${thumbnailPath}`);
await this.runCommand('ffmpeg', ['-y','-ss', '00:00:01.000', '-i', outputPath, '-vframes','1', thumbnailPath], jobLogger);
jobLogger.info(`Thumbnail generated, getting dominant color of the thumbnail`);
const dominantColor = await getAverageColor(thumbnailPath, {algorithm: 'dominant'});
metadata.width = videoStream.getWidth();
metadata.height = videoStream.getHeight();
metadata.duration = container.getDuration() * 1000;
metadata.dominantColorOfThumbnail = dominantColor.hex;
metadata.thumbnailPath = thumbnailPath;
jobLogger.info('Generating keyframes preview tile');
await this.generatePreviewImage(outputPath, metadata, jobLogger);
return metadata;
}

Expand All @@ -106,11 +98,13 @@ export class JobMetadataHelperImpl implements JobMetadataHelper {
const keyframeImagePath = join(imageDirPath, `${imageFilenameBase}-%3d.jpg`);
// generate tiles for key frames every 1 second
await this.runCommand('ffmpeg', ['-y', '-i', videoPath,
'-an',
'-vsync', '0',
'-vf',
`select=isnan(prev_selected_t)+gte(t-prev_selected_t\\,2),scale=${metaData.frameWidth}:${metaData.frameHeight},tile=${metaData.tileSize}x${metaData.tileSize}`,
'-an', '-vsync', '0', keyframeImagePath
keyframeImagePath
], jobLogger);

jobLogger.info('keyframes generated!');
const filenameList = await readdir(imageDirPath);
metaData.keyframeImagePathList = filenameList.filter(f => f.endsWith('.jpg') && f.startsWith(imageFilenameBase)).map(f => join(imageDirPath, f));
}
Expand All @@ -128,11 +122,12 @@ export class JobMetadataHelperImpl implements JobMetadataHelper {
logger.info(decoder.end(data));
});
child.stderr.on('data', (data) => {
logger.error(decoder.end(data));
logger.info(decoder.end(data));
});
child.on('close', (code) => {
logger.info('command finished, exit code = ' + code);
if (code !== 0) {
reject();
reject('ffmpeg command failed with non-0 exit code');
return;
}
resolve(undefined);
Expand Down
12 changes: 6 additions & 6 deletions src/JobManager/VertexManagerImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,17 @@ export class VertexManagerImpl implements VertexManager {
const vertex = vertexMap[vertexId];
const vertexLogger = this._vertexLoggerDict[vertexId];
if (vertex.status === VertexStatus.Running && this._runningVertexDict[vertexId]) {
vertexLogger.warn('trying to cancel vertex');
vertexLogger.info('trying to cancel vertex');
vertex.status = VertexStatus.Canceled;
allPromise.push(vertexRepo.save(vertex)
.then(() => {
return this._runningVertexDict[vertexId].videoProcessor.cancel()
})
.then(() => {
vertexLogger.warn('vertex canceled');
vertexLogger.info('vertex canceled');
})
.catch((error) => {
vertexLogger.error(error);
vertexLogger.info(error);
}));
}
});
Expand Down Expand Up @@ -258,7 +258,7 @@ export class VertexManagerImpl implements VertexManager {
vertex.videoProcessor = this._processorFactory(vertex.actionType);
vertex.videoProcessor.registerLogHandler((logChunk, ch) => {
if (ch === 'stderr') {
vertexLogger.error(logChunk);
vertexLogger.info(logChunk);
} else {
vertexLogger.info(logChunk);
}
Expand All @@ -284,7 +284,7 @@ export class VertexManagerImpl implements VertexManager {
await vertex.videoProcessor.dispose();
this.onVertexFinished(vertex.id);
} catch (error) {
vertexLogger.error(error);
vertexLogger.info(error);
this.onVertexError(vertex.id, error);
await vertex.videoProcessor.dispose();
} finally {
Expand Down Expand Up @@ -342,7 +342,7 @@ export class VertexManagerImpl implements VertexManager {
.catch((err) => {
// save error
this._sentry.capture(err);
this._vertexLoggerDict[vertexId].error(err);
this._vertexLoggerDict[vertexId].info(err);
this._vertexLoggerDict[vertexId].info(LOG_END_FLAG);
});
}
Expand Down
88 changes: 57 additions & 31 deletions src/api-service/log-streaming-helper.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 IROHA LAB
* Copyright 2023 IROHA LAB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,44 +20,52 @@ import { getStdLogger, LOG_END_FLAG } from '../utils/Logger';
import { createInterface } from 'readline';
import { createReadStream } from 'fs';
import { stat } from 'fs/promises';
import { promisify } from 'util';

const sleep = promisify(setTimeout);
const logger = getStdLogger();

export function tailing(logPath: string, socket: Socket, breakAtEndFlag: boolean): void {
const tail = new Tail(logPath, {
fromBeginning: true,
flushAtEOF: true
});
tail.on('line', (line) => {
if (line) {
try {
const lineDict = JSON.parse(line);
if (lineDict.msg === LOG_END_FLAG) {
socket.emit('log:line_end', 'end of the log');
if (breakAtEndFlag) {
tail.unwatch();
waitForFileCreation(logPath)
.then(() => {
const tail = new Tail(logPath, {
fromBeginning: true,
flushAtEOF: true
});
tail.on('line', (line) => {
if (line) {
try {
const lineDict = JSON.parse(line);
if (lineDict.msg === LOG_END_FLAG) {
socket.emit('log:line_end', 'end of the log');
if (breakAtEndFlag) {
tail.unwatch();
}
// socket.disconnect();
return;
}
} catch (err) {
logger.error(err);
}
// socket.disconnect();
return;
}
} catch (err) {
logger.error(err);
}
}
socket.emit('log:line', line);
});
socket.emit('log:line', line);
});

tail.on('error', (error) => {
logger.error(error);
socket.emit('error', error);
socket.disconnect();
});
tail.on('error', (error) => {
logger.error(error);
socket.emit('error', error);
socket.disconnect();
});

socket.on('disconnect', (reason) => {
if (tail) {
tail.unwatch();
}
});
socket.on('disconnect', (reason) => {
if (tail) {
tail.unwatch();
}
});
})
.catch((e) => {
logger.error(e);
});
}

export function readToEnd(logPath: string, socket: Socket): void {
Expand Down Expand Up @@ -95,4 +103,22 @@ export async function isFileExists(logPath: string): Promise<boolean> {
} catch (e) {
return false;
}
}

async function waitForFileCreation(filePath: string, waitCount = 0): Promise<any> {
try {
const fileStats = await stat(filePath);
} catch (err: any) {
if (err.code === 'ENOENT') {
if (waitCount < 10) {
await sleep(5000);
return await waitForFileCreation(filePath, waitCount + 1);
} else {
throw new Error('max wait time reached, but file still not exists');
}
} else {
throw err;
}
}

}

0 comments on commit 299169a

Please sign in to comment.