Skip to content

Commit

Permalink
Migrated from sax to saxes.
Browse files Browse the repository at this point in the history
Migrated from stream pipe to for-await-of iterable.
All event handlers turned into async.
Implemented parsing with nested async/await event handlers support. All parser events are processed one-by-one according to their actual order.
  • Loading branch information
dobromyslov committed Jan 31, 2021
1 parent 8be3fbb commit bcf3d8b
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 108 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@
},
"dependencies": {
"commerceml-parser-core": "^2.1.1",
"emittery": "^0.8.1",
"entities": "^2.1.0",
"sax": "^1.2.4",
"saxes": "^5.0.1",
"x2js": "^3.4.0"
},
"devDependencies": {
Expand Down
61 changes: 45 additions & 16 deletions spec/import.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {CommerceMlImportParser} from 'commerceml-parser';
import {CommerceMlImportParser} from '../src';
import {createReadStream} from 'fs';

describe('async import', () => {
Expand All @@ -7,39 +7,68 @@ describe('async import', () => {
const parser = new CommerceMlImportParser();

parser.onCommercialInformation(commercialInformation => {
return new Promise((resolve, reject) => {
console.log('Commercial Information');
});
console.log('Commercial Information');
/*
return new Promise<void>(async (resolve, reject) => {
await new Promise<void>((resolve1, reject1) => {
console.log('Commercial Information');
resolve1();
});
resolve();
});*/
});

parser.onClassifier(classifier => {
return new Promise((resolve, reject) => {
console.log('Classifier');
return new Promise<void>(async (resolve) => {
await new Promise<void>((resolve1) => {
console.log('Classifier');
resolve1();
});
resolve();
});
});

parser.onClassifierGroup(classifierGroup => {
return new Promise((resolve, reject) => {
console.log('Classifier group');
return new Promise<void>(async (resolve) => {
await new Promise<void>((resolve1) => {
console.log('Classifier group');
resolve1();
});
resolve();
});
});

parser.onClassifierProperty(classifierProperty => {
return new Promise((resolve, reject) => {
console.log('Classifier property');
return new Promise<void>(async (resolve) => {
await new Promise<void>((resolve1) => {
console.log('Classifier property');
resolve1();
});
resolve();
});
});

parser.onCatalog(catalog => {
return new Promise((resolve, reject) => {
for (let i = 0; i < 10000000; i++) {}
console.log('Catalog');
parser.onCatalog(async catalog => {
console.log('Start');
await new Promise<void>(async (resolve) => {
await new Promise<void>((resolve1) => {
for (let i = 0; i < 1000000000; i++) {
}
console.log('First Level Promise End');
resolve1();
});
console.log('Second Level Promise End');
resolve();
});
});

parser.onProduct(product => {
return new Promise((resolve, reject) => {
console.log('Product');
return new Promise<void>(async (resolve) => {
await new Promise<void>((resolve1) => {
console.log('Product', product.id);
resolve1();
});
resolve();
});
});

Expand Down
198 changes: 146 additions & 52 deletions src/abstract-parser.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {createStream, QualifiedTag, SAXStream, Tag} from 'sax';
import {encodeXML} from 'entities/lib';
import X2JS from 'x2js';
import {SaxesParser, SaxesTagPlain} from 'saxes';
import Emittery from 'emittery';
import {Readable} from 'stream';

export interface CommerceMlRule {
Expand All @@ -10,12 +11,24 @@ export interface CommerceMlRule {

export interface CommerceMlRules extends Record<string, CommerceMlRule> {}

export abstract class CommerceMlAbstractParser {
/**
* SAX Parser instance.
*/
protected stream: SAXStream;
/**
* Saxes event which is buffered and returned by low level parseChunk() generator.
*/
export interface SaxesEvent {
type: 'opentag' | 'text' | 'closetag' | 'end';
tag?: SaxesTagPlain;
text?: string;
}

/**
* CommerceML event which is buffered and returned by top level parse() generator.
*/
export interface ParserEvent {
type: string;
data?: unknown;
}

export abstract class CommerceMlAbstractParser {
/**
* Current position in the xml file.
* Array of tag names.
Expand All @@ -34,79 +47,158 @@ export abstract class CommerceMlAbstractParser {
protected currentRuleKey = '';

/**
*
* XML content collected starting from the openTag (inclusive) till the close tag (inclusive).
*/
protected xml = '';

/**
*
* Indicates whether current XML node should be collected according to parser rules.
*/
protected collectCurrentNode = false;

/**
*
* List of currently opened tags.
* i.e. xpath.
*/
protected openTags: string[] = [];

/**
* Buffer of parser events collected from one parsed chunk.
* @protected
*/
protected events: ParserEvent[] = [];

/**
* Serial synchronous Event emitter.
* @protected
*/
protected eventEmitter: Emittery = new Emittery();

/**
* XML to plain JS objects convertor.
* @protected
*/
protected x2js = new X2JS();

/**
* CommerceML rules defines which XML tags should be collected and returned.
* @protected
*/
protected abstract rules: CommerceMlRules;

constructor() {
this.stream = createStream(true, {
trim: true,
normalize: true
});
public async parse(readable: Readable): Promise<void> {
// https://nodejs.org/api/stream.html
// chunk of data is a string if a default encoding has been specified
// for the stream using the readable.setEncoding() method;
// otherwise the data will be passed as a Buffer.
readable.setEncoding('utf8');
return this.parseIterable(readable as unknown as Iterable<string>);
}

this.stream.on('opentag', (tag: Tag | QualifiedTag) => {
this.onOpenTag(tag);
});
public async parseIterable(iterable: Iterable<string>): Promise<void> {
for await (const saxesEvents of this.parseChunk(iterable) ?? []) {
for (const saxesEvent of saxesEvents ?? []) {
switch (saxesEvent.type) {
case 'opentag':
this.onOpenTag(saxesEvent.tag!);
break;

case 'text':
this.onText(saxesEvent.text!);
break;

case 'closetag':
this.onCloseTag(saxesEvent.tag!);
break;

case 'end':
this.events.push({type: 'end'});
break;
}
}

this.stream.on('closetag', (tagName: string) => {
this.onCloseTag(tagName);
});
await this.processParserEvents(this.events);
this.events = [];
}
}

this.stream.on('text', (text: string) => {
this.onText(text);
});
/**
* Processes parser events.
* @param events Collected parser events during one XML chunk parsing.
* @protected
*/
protected async processParserEvents(events: ParserEvent[]): Promise<void> {
// TODO: decide which API to use: an event emitter or a plain synchronous functions
for (const event of events ?? []) {
await this.eventEmitter.emitSerial(event.type, event.data);
}
}

public onEnd(callback: () => void) {
this.stream.on('end', callback);
public onEnd(
callback: () => void | Promise<void>
): void {
this.eventEmitter.on('end', callback);
}

/**
* Starts parsing readable stream.
* @param readStream
* Generator method.
* Parses one chunk of the iterable input (Readable stream in the string data reading mode).
* @see https://nodejs.org/api/stream.html#stream_event_data
* @param iterable Iterable or Readable stream in the string data reading mode.
* @returns Array of SaxesParser events
* @throws Error if a SaxesParser error event was emitted.
*/
public async parse(readStream: Readable): Promise<void> {
return new Promise((resolve, reject) => {
this.stream.on('end', () => {
resolve();
public async *parseChunk(iterable: Iterable<string>): AsyncGenerator<SaxesEvent[], void, undefined> {
const saxesParser = new SaxesParser<{}>();
let error;
saxesParser.on('error', _error => {
error = _error;
});

// As a performance optimization, we gather all events instead of passing
// them one by one, which would cause each event to go through the event queue
let events: SaxesEvent[] = [];
saxesParser.on('opentag', tag => {
events.push({
type: 'opentag',
tag
});
});

this.stream.on('error', error => {
reject(error);
saxesParser.on('text', text => {
events.push({
type: 'text',
text
});
});

readStream.pipe(this.stream);
saxesParser.on('closetag', tag => {
events.push({
type: 'closetag',
tag
});
});
}

/**
* Returns SAX stream.
*/
public getStream(): SAXStream {
return this.stream;
}
for await (const chunk of iterable) {
saxesParser.write(chunk as string);
if (error) {
throw error;
}

yield events;
events = [];
}

public end(): void {
this.stream.end();
yield [{
type: 'end'
}];
}

/**
* SAX 'opentag' event handler.
* @param tag
*/
protected onOpenTag(tag: Tag | QualifiedTag) {
protected onOpenTag(tag: SaxesTagPlain): void {
this.openTag = tag.name;
this.xPath.push(tag.name);
this.collectCurrentNode = false;
Expand All @@ -116,7 +208,7 @@ export abstract class CommerceMlAbstractParser {
if (this.isCurrentXPathEqualsToRuleXPath(rule.start)) {
// If currentRule key already set then finish previous XML collection
if (this.currentRuleKey) {
this.emitCollected();
this.addEventToBuffer();
}

// Start new XML collection
Expand All @@ -135,7 +227,6 @@ export abstract class CommerceMlAbstractParser {
}

this.xml += '>';
return this.xml;
}

/**
Expand All @@ -157,7 +248,7 @@ export abstract class CommerceMlAbstractParser {
return false;
}

protected emitCollected(): void {
protected addEventToBuffer(): void {
// Close all opened tags
if (this.openTags.length > 0) {
for (const tagName of this.openTags.reverse()) {
Expand All @@ -167,8 +258,11 @@ export abstract class CommerceMlAbstractParser {
this.openTags = [];
}

const x2js = new X2JS();
this.stream?.emit(this.currentRuleKey, x2js.xml2js(this.xml));
// Add parser event to the buffer
this.events.push({
type: this.currentRuleKey,
data: this.x2js.xml2js(this.xml)
});

this.currentRuleKey = '';
this.xml = '';
Expand All @@ -180,15 +274,15 @@ export abstract class CommerceMlAbstractParser {
}
}

protected onCloseTag(tagName: string) {
protected onCloseTag(tag: SaxesTagPlain): void {
if (this.currentRuleKey) {
if (this.shallCollect()) {
this.xml += `</${tagName}>`;
this.xml += `</${tag.name}>`;
this.openTags.pop();
}

if (this.isCurrentXPathEqualsToRuleXPath(this.rules[this.currentRuleKey].start)) {
this.emitCollected();
this.addEventToBuffer();
}
}

Expand Down
Loading

0 comments on commit bcf3d8b

Please sign in to comment.