Data parsing using ES6 Async Iterators
Processing huge files in Node.js
can be hard. Especially when you need execute send or retrieve data from external sources.
This package
- Parse big CSV | XML | JSON files in memory efficient way.
- Write data to CSV | JSON | XML file in memory efficient way.
Async iterators are natively supported in Node.js 10.x. If you're using Node.js 8.x or 9.x, you need to use Node.js' --harmony_async_iteration
flag.
Async iterators are not supported in Node.js 6.x or 7.x, so if you're on an older version you need to upgrade Node.js to use async iterators.
$ npm install itterparse
Or using yarn
$ yarn add itterparse
For processing iterators I recommend to use IxJS library
import { AsyncIterable } from "ix";
import { map } from "ix/asynciterable/operators";
import { csvRead, csvWrite } from "iterparse";
interface MyCSVObject {
prop1: string;
prop2: boolean;
prop3: number;
}
AsyncIterable
.from(csvRead<MyCSVObject>("./big_csv_file.csv"))
.map((obj) => {
// transform object in any way
return {
a: obj.prop3 * 5,
b: obj.prop1 * 2
};
})
.pipe(csvWrite("./big_csv_file_2.csv"))
);
Or just loop iterators like this
async function process() {
for await (const data of csvRead<MyCSVObject>("./big_csv_file.csv")) {
// Do anything with data
}
}
Usage in e-commerce Big e-shops can have feeds with 100k or more products. load all this data at once is really in practical.
const productCount = 100000;
const productSizeInKb = 20;
const totalMemoryConsumption = productCount * productSizeInKb * 1024; // 2gb of memory just to load data
So base on this calculation we will use 2gb of memory just to load data when we start working with data memory footprint will grow 6, 10 times.
We can use node streams to solve this problem, but working with streams is kinda mind bending and really hard especially when you need manipulate data in meaningfully way and send data to external source api
machine learning network
database
Some examples what we what we can do with iterparse
import { AsyncIterable } from 'ix';
import { xmlRead, jsonWrite } from 'iterparse'
interface Video {
id: string,
url: string,
description: string
}
async function getListOfYoutubeVideos(query: string): Promise<Video[]> {
// I will not implement real logic here
// Just have in mind that this function will do some http requests
...
}
function getProductBaseInfo(data: XMLObject): { id: string, url: string, title: string } {...}
AsyncIterable
.from(xmlRead("./big_product_feed.xml", { pattern: 'product' }))
.map(getProductBaseInfo) // Extract data that we need
.map(async ({ title, ...rest })=>{
const videos = await getListOfYoutubeVideos(title) // And we can little execute our task
return {
videos,
title,
...rest
}
}).
.pipe(jsonWrite( `./small_feed_with_videos.json` )) // Write all extracted data to JSON file
.count()
// All iterators must be consumed in any way.
// I just pick count().
// Other alternatives are toArray(), forEach(), reduce() ect.
Keep in mind this is trivial example but it illustrates how to process huge amounts of data.
- Output - Valid path to file or writable stream
type Output = string | NodeJS.WriteStream;
- Source - Valid path or readable stream
type Source = string | NodeJS.ReadableStream;
-
csvRead - Read CSV file row by row.
interface CSVReadOptions { delimiter?: string; // default: "," newline?: string; // default: "\r\n" quoteChar?: string; // default: '"' escapeChar?: string; // default: '"' header?: boolean; // default: false trimHeaders?: boolean; // default: false dynamicTyping?: boolean // default true encoding?: string; // default: "" comments?: boolean | string; // default: false skipEmptyLines?: boolean | 'greedy'; // default: false } type CSVObject = { [k: string]: string | boolean | number } function csvRead<T extends CSVObject, options?: CSVWriteOptions>(input: Source): AsyncIterable<T>
-
csvWrite - Writes iterator item to CSV file and returns same item for further processing.
interface CSVWriteOptions { quotes?: boolean | boolean[]; // default: false quoteChar?: string; // default: '"' escapeChar?: string; // default: '"' delimiter?: string; // default: "," header?: boolean; // default: true newline?: string; // default: "\r\n" skipEmptyLines?: boolean | "greedy"; // default: false columns?: string[]; // default: null } type CSVObject = { [k: string]: string | boolean | number }; function csvWrite<T extends CSVObject>( out: Output ): OperatorAsyncFunction<T, T>; function csvWrite<T extends CSVObject>( data: AnyIterable<T>, out: Output ): AsyncIterableX<T>;
-
jsonRead - Reads JSON file without loading entire file to memory
interface JSONReadOptions { /** * Pattern option to defined where to find array items * (* => [{...}, {...}]) * (*.a) => { a: [{...}, {...}] } * (*.a.b) => { a: { b: [{...}, {...}] } } */ pattern: string; } function jsonRead<T>( source: Source, options: JSONReadOptions ): AsyncIterable<T>;
-
jsonWrite - Writes iterator to JSON file in memory efficient matter. Some considerations
- If file exists function will overwrite file
- If path does not exists function will path to file
export function jsonWrite<T>(out: Output): OperatorAsyncFunction<T, T>; export function jsonWrite<T>( out: Output, data: AnyIterable<T>, ): AsyncIterable<T>;
-
xmlRead - Reads JSON file without loading entire file to memory
type XMLAttributes = Record<string, string>; type XMLMarkup = XMLObject | string; type XMLObject = { $name: string; // Tag name $attrs?: XMLAttributes; // If tag have attributes $text?: string; // If node have mixed content <person>text<friend>asd</friend><person> $markup?: ReadonlyArray<XMLMarkup>; // And then we will have markup props // In most cases we will have object of this format [d: string]: | string | XMLMarkup | XMLAttributes | ReadonlyArray<XMLMarkup> | undefined | Object; }; interface XMLReadOptions { pattern: string; } function xmlRead<T extends XMLObject>( source: Source, options: { pattern: string } ): AsyncIterable<T>;
Usage path_to_xml.xml file
<root> <person> <name>Bill</name> <id>1</id> <age>27</age> </person> <person> <name>Sally</name> <id>2</id> <age>29</age> </person> <person> <name>Kelly</name> <id>3</id> <age>37</age> </person> </root>
And in code
const xmlIter = xmlRead("./path_to_xml.xml", { patter: "person" }); for await (const data of xmlIter) { // Will receive one by one // { name: "Bill", id: "1", age: "27" } // { name: "Sally", id: "2", age: "29" } // { name: "Kelly", id: "3", age: "37" } // Do what ever you want here await sendDataToServer(data); } // Or my prefer way import { AsyncIterable } from "ix"; AsyncIterable.from(xmlRead("./path_to_xml.xml", { patter: "person" })).map( async (data) => { await sendDataToServer(data); } );
-
xmlWrite - Writes iterator to XML file in memory efficient matter.
Some considerations
- If file exists function will overwrite file
- If path does not exists function will create all folder structure
xmlWrite([{ a: 5 }], "./data/sub/ax/we/c.xml"); // Function will create path
export function xmlWrite( out: Output ): OperatorAsyncFunction<XMLObject, XMLObject>; export function xmlWrite( out: Output, data: AnyIterable<XMLObject>, ): AsyncIterable<XMLObject>;
Usage - It's just trivial example
import { AsyncIterable } from "ix"; const obj = [ { $name: "person", name: "John", tags: [ { $name: "tag", $attrs: { id: "1" }, $text: "Senior developer" }, { $name: "tag", $attrs: { id: "2" }, $text: "Fishing" }, ], }, { $name: "person", name: "Bill", tags: [{ $name: "tag", $attrs: { id: "3" }, $text: "Gaming" }], }, ]; AsyncIterable.from(obj) // Converting regular array to AsyncIterable .pipe(xmlWrite("result.xml")) // Writing objects to file .count(); // Consuming iterator
result.xml
<root> <person> <name>John</name> <tags> <tag id="1">Senior developer</tag> <tag id="2">Fishing</tag> </tags> </person> <person> <name>Bill</name> <tags> <tag id="3">Gaming</tag> </tags> </person> </root>