Skip to content

Commit

Permalink
feat(general): show current progress/file during validation
Browse files Browse the repository at this point in the history
Closes #142
  • Loading branch information
buchslava committed Jul 3, 2017
1 parent 616cda7 commit 29a8af9
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 91 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"test-travis": "istanbul cover mocha _mocha -- -R spec --timeout 200000 --compilers ts:ts-node/register --recursive test/**/*.spec.ts && codecov",
"changelog": "conventional-changelog -i CHANGELOG.md -s -p angular",
"github-release": "conventional-github-releaser -p angular",
"build": "tsc && touch lib/package.json && echo \\{\\\"version\\\": \\\"1.7.6\\\"\\} > lib/package.json",
"build": "tsc && touch lib/package.json && echo \\{\\\"version\\\": \\\"1.8.0\\\"\\} > lib/package.json",
"prepublish": "npm run build",
"preversion": "npm test",
"version": "npm run changelog && git add CHANGELOG.md",
Expand Down Expand Up @@ -44,7 +44,7 @@
"deep-diff": "0.3.4",
"fast-csv": "2.4.0",
"fs": "0.0.2",
"jsonexport": "^1.5.2",
"jsonexport": "1.5.2",
"levenshtein": "1.0.5",
"lodash": "4.17.4",
"md5": "2.2.1",
Expand Down
13 changes: 11 additions & 2 deletions src/cli-logic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as fs from 'fs';
import * as path from 'path';
import { isString } from 'lodash';
import { logger, settings, ddfRootFolder } from './utils';
import { validationTransport } from './utils/logger';
import { getRulesInformation } from './ddf-rules/registry';
import { DataPackage } from './data/data-package';
import { DdfJsonCorrector } from './ddf-definitions/ddf-json-corrector';
Expand Down Expand Up @@ -89,9 +90,11 @@ if (!isValidationExpected) {
if (isValidationExpected) {
const validator = new StreamValidator(ddfRootFolder, settings);

logger.notice('[');
let hasIssue = false;

validator.on('issue', (issue: any) => {
hasIssue = true;

if (isString(issue)) {
logger.notice(issue + '\n');
}
Expand All @@ -106,7 +109,13 @@ if (isValidationExpected) {
throw err;
}

logger.notice('{}]\n');
if (settings.progress && hasIssue) {
console.log(`\nValidation was finished with issues. Details are here: ${validationTransport.file}.`);
}

if (settings.progress && !hasIssue) {
console.log('\nValidation was finished successfully.');
}

checkLatestVersion(localPackage.version);
});
Expand Down
2 changes: 1 addition & 1 deletion src/data/csv-checker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const getErrors = parsedCsv => parsedCsv.errors
export class CsvChecker {
public filePath: string;
public error: any;
public errors: Array<any> = [];
public errors: any[] = [];

constructor(filePath) {
this.filePath = filePath;
Expand Down
18 changes: 11 additions & 7 deletions src/data/data-package.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ export class DataPackage {
public errors: any[];
public warnings: any[];
public fileDescriptors: IDdfFileDescriptor[];
public fileDescriptorsHash: Map<string, IDdfFileDescriptor>;
public dataPackageContent: any;
public translationFolders: any[];
public db: Db;
Expand Down Expand Up @@ -328,8 +329,7 @@ export class DataPackage {
path: `${getRelativeDir(fileDescriptor.fullPath)}${fileDescriptor.filename}`,
name: `${stripDdfPrefix(fileDescriptor.name)}${getNameSuffix(fileDescriptor.directoryIndex)}`,
schema: {
fields: (fileDescriptor.headers || [])
.map(header => prepareField(header, fileDescriptor)),
fields: (fileDescriptor.headers || []).map(header => prepareField(header, fileDescriptor)),
primaryKey: fileDescriptor.primaryKey
}
}))
Expand All @@ -339,11 +339,15 @@ export class DataPackage {
getType(filename) {
const normalizedFileName = path.resolve(this.rootFolder, filename);

return head(
this.fileDescriptors
.filter(fileDescriptor => path.resolve(this.rootFolder, fileDescriptor.fullPath) === normalizedFileName)
.map(fileDescriptor => fileDescriptor.type)
);
if (!this.fileDescriptorsHash) {
this.fileDescriptorsHash = new Map<string, IDdfFileDescriptor>();

this.fileDescriptors.forEach(fileDescriptor => {
this.fileDescriptorsHash.set(path.resolve(this.rootFolder, fileDescriptor.fullPath), fileDescriptor);
});
}

return this.fileDescriptorsHash.get(normalizedFileName).type;
}

build(onDataPackageReady) {
Expand Down
9 changes: 5 additions & 4 deletions src/data/ddf-root.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { resolve } from 'path';
import { TRNSLATIONS_FOLDER } from '../ddf-definitions/constants';
import { DataPackage } from './data-package';
import { FileDescriptor } from './file-descriptor';
import { logger } from '../utils';

const PROCESS_LIMIT = 30;

Expand Down Expand Up @@ -75,13 +76,13 @@ export class DDFRoot {

this.fileDescriptors = expectedResources.map(ddfResource => this.getFileDescriptor(this.path, ddfResource));

const actionsCsv = this.fileDescriptors.map(fileDescriptor => onFileChecked =>
fileDescriptor.csvChecker.check(onFileChecked));
const actionsForDescriptor = this.fileDescriptors.map(fileDescriptor =>
logger.progressInit('root checking', {total: this.fileDescriptors.length});

const actions = this.fileDescriptors.map(fileDescriptor =>
onFileChecked => fileDescriptor.check(onFileChecked));

parallelLimit(
actionsCsv.concat(actionsForDescriptor),
actions,
PROCESS_LIMIT,
checkErr => {
if (checkErr) {
Expand Down
8 changes: 7 additions & 1 deletion src/data/file-descriptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { stat } from 'fs';
import { getFileLine } from '../utils/file';
import { INCORRECT_FILE } from '../ddf-rules/registry';
import { CsvChecker } from './csv-checker';
import { logger } from '../utils';

const PROCESS_LIMIT = 30;

Expand Down Expand Up @@ -102,11 +103,16 @@ export class FileDescriptor {
}

if (isEmpty(this.issues)) {
this.csvChecker.check(() => onFileDescriptorChecked());
this.csvChecker.check(() => {
logger.progress();
onFileDescriptorChecked();
});

return;
}

logger.progress();

onFileDescriptorChecked(this.issues);
});
});
Expand Down
57 changes: 32 additions & 25 deletions src/ddf-definitions/ddf-data-set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { DataPoint } from './data-point';
import { Db } from '../data/db';
import { DDFRoot } from '../data/ddf-root';
import { DataPackage } from '../data/data-package';
import { logger } from '../utils';

export class DdfDataSet {
public db: Db;
Expand All @@ -30,33 +31,39 @@ export class DdfDataSet {
};

const processFileDescriptors = (ddfRoot: DDFRoot) => {
ddfRoot.fileDescriptors
.filter(fileDescriptor => isEmpty(fileDescriptor.issues))
.forEach(fileDescriptor => {
if (fileDescriptor.is(DATA_POINT)) {
loaders.push(onFileLoaded => {
fileDescriptor.fillHeaders(() => {
this.expectedClass[fileDescriptor.type].addDescriptors(fileDescriptor, ddfRoot.dataPackageDescriptor);
onFileLoaded();
});
const expectedFileDescriptors = ddfRoot.fileDescriptors.filter(fileDescriptor => isEmpty(fileDescriptor.issues));

logger.progressInit('dataset loading', {total: expectedFileDescriptors.length});

expectedFileDescriptors.forEach(fileDescriptor => {
if (fileDescriptor.is(DATA_POINT)) {
loaders.push(onFileLoaded => {
fileDescriptor.fillHeaders(() => {
this.expectedClass[fileDescriptor.type].addDescriptors(fileDescriptor, ddfRoot.dataPackageDescriptor);

logger.progress();

onFileLoaded();
});
}

if (fileDescriptor.is([CONCEPT, ENTITY])) {
loaders.push(onFileLoaded => {
fileDescriptor.fillHeaders(() => {
this.db.fillCollection(
Symbol.keyFor(fileDescriptor.type),
fileDescriptor.fullPath,
fileErr => {
this.expectedClass[fileDescriptor.type].addFileDescriptor(fileDescriptor);
this.expectedClass[fileDescriptor.type].getTranslationsData(translationsErr =>
onFileLoaded(fileErr || translationsErr));
}, false);
});
});
}

if (fileDescriptor.is([CONCEPT, ENTITY])) {
loaders.push(onFileLoaded => {
fileDescriptor.fillHeaders(() => {
this.db.fillCollection(
Symbol.keyFor(fileDescriptor.type),
fileDescriptor.fullPath,
fileErr => {
this.expectedClass[fileDescriptor.type].addFileDescriptor(fileDescriptor);
this.expectedClass[fileDescriptor.type].getTranslationsData(translationsErr => onFileLoaded(fileErr || translationsErr));

logger.progress();
}, false);
});
}
});
});
}
});
};

if (this.ddfRoot.isDDF) {
Expand Down
45 changes: 35 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as path from 'path';
import { EventEmitter } from 'events';
import { parallelLimit } from 'async';
import { isEmpty, flattenDeep, concat, isArray } from 'lodash';
import { isEmpty, flattenDeep, concat, isArray, compact } from 'lodash';
import { DdfDataSet } from './ddf-definitions/ddf-data-set';
import { allRules as ddfRules } from './ddf-rules';
import { IssuesFilter } from './utils/issues-filter';
Expand All @@ -17,6 +17,7 @@ import {
getDataPointFileChunks,
createRecordBasedRulesProcessor
} from './shared';
import { logger } from './utils';

const child_process = require('child_process');
const os = require('os');
Expand All @@ -29,12 +30,21 @@ function clearGlobalStates() {
}

function processSimpleRules(context, onIssue) {
Object.getOwnPropertySymbols(ddfRules)
.filter(key => isSimpleRule(ddfRules[key]))
.filter(key => context.issuesFilter.isAllowed(key))
.map(key => ddfRules[key].rule(context.ddfDataSet))
.filter(issues => !isEmpty(issues))
.forEach(issue => onIssue(issue));
const rulesKeys = Object.getOwnPropertySymbols(ddfRules).filter(key => isSimpleRule(ddfRules[key]) && context.issuesFilter.isAllowed(key));

for (let key of rulesKeys) {
const issues = ddfRules[key].rule(context.ddfDataSet);

if (!isEmpty(issues)) {
if (isArray(issues)) {
issues.forEach(issue => onIssue(issue));
}

if (!isArray(issues)) {
onIssue(issues)
}
}
}
}

export function createRecordBasedRuleProcessor(context, fileDescriptor, resultHandler) {
Expand Down Expand Up @@ -128,7 +138,7 @@ export class JSONValidator {
childProcess.on('message', (message) => {
childProcessesFinished++;

this.out = this.out.concat(message.out);
this.out = compact(this.out.concat(message.out));

if (childProcessesFinished === cpuCount) {
this.issueEmitter.emit('finish', message.err, this.out);
Expand Down Expand Up @@ -174,7 +184,9 @@ export class JSONValidator {
}

if (!this.settings.isMultithread) {
parallelLimit(getValidationActions(this), CONCURRENT_OPERATIONS_AMOUNT, err => {
const actions = getValidationActions(this);

parallelLimit(actions, CONCURRENT_OPERATIONS_AMOUNT, err => {
this.issueEmitter.emit('finish', err, this.out);
});
}
Expand Down Expand Up @@ -202,6 +214,8 @@ export class StreamValidator {
if (!isEmpty(result)) {
result.map(issue => this.issueEmitter.emit('issue', issue.view()));
}
}, () => {
logger.progress();
});
}

Expand All @@ -211,6 +225,9 @@ export class StreamValidator {

multiThreadProcessing() {
const fileChunks = getDataPointFileChunks(this.ddfDataSet, cpuCount);
const total = fileChunks.reduce((result, chunk) => result + chunk.length, 0);

logger.progressInit('datapoints validation', {total});

let childProcessesFinished = 0;

Expand All @@ -222,6 +239,10 @@ export class StreamValidator {
childProcessesFinished++;
}

if (message.progress) {
logger.progress();
}

if (!message.finish && message.issue) {
this.issueEmitter.emit('issue', message.issue);
}
Expand Down Expand Up @@ -270,7 +291,11 @@ export class StreamValidator {
}

if (!this.settings.isMultithread) {
parallelLimit(getValidationActions(this), CONCURRENT_OPERATIONS_AMOUNT, err => {
const actions = getValidationActions(this);

logger.progressInit('datapoints validation', {total: actions.length});

parallelLimit(actions, CONCURRENT_OPERATIONS_AMOUNT, err => {
this.issueEmitter.emit('finish', err);
});
}
Expand Down
17 changes: 14 additions & 3 deletions src/shared.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { compact, isArray, sortBy } from 'lodash';
import { compact, isArray, sortBy, flattenDeep } from 'lodash';
import { allRules as ddfRules } from './ddf-rules';
import { FileDescriptor } from './data/file-descriptor';
import { DdfDataSet } from './ddf-definitions/ddf-data-set';
Expand Down Expand Up @@ -43,7 +43,13 @@ export const noTranslation = (key, fileDescriptor) => !fileDescriptor.isTranslat
export const toArray = value => compact(isArray(value) ? value : [value]);
export const getDataPointFileChunks = (ddfDataSet: DdfDataSet, cpuCount: number): string[] => {
const fileChunks = [];
const filesSortedBySize = sortBy(ddfDataSet.getDataPoint().fileDescriptors, ['size'])
const expectedFileDescriptors = ddfDataSet.getDataPoint().fileDescriptors;
const translationFileDescriptors = flattenDeep(ddfDataSet.getDataPoint().fileDescriptors.map(fileDescriptor =>
fileDescriptor.getExistingTranslationDescriptors()));

expectedFileDescriptors.push(...translationFileDescriptors);

const filesSortedBySize = sortBy(expectedFileDescriptors, ['size'])
.map((fileDescriptor: FileDescriptor) => fileDescriptor.fullPath);

for (let index = 0; index < cpuCount; index++) {
Expand All @@ -61,7 +67,7 @@ export const getDataPointFileChunks = (ddfDataSet: DdfDataSet, cpuCount: number)
return fileChunks;
};

export function createRecordBasedRulesProcessor(context, fileDescriptor, resultHandler) {
export function createRecordBasedRulesProcessor(context, fileDescriptor, resultHandler, progressHandler?) {
const ddfDataSet = context.ddfDataSet;

return onDataPointReady => {
Expand All @@ -70,6 +76,11 @@ export function createRecordBasedRulesProcessor(context, fileDescriptor, resultH
createRecordAggregationProcessor(context, ddfDataSet, fileDescriptor, resultHandler),
() => {
processAggregation(context, ddfDataSet, fileDescriptor, resultHandler);

if (progressHandler) {
progressHandler();
}

onDataPointReady();
}
);
Expand Down
Loading

0 comments on commit 29a8af9

Please sign in to comment.