From 80139ff5866f8a3bc5099ce5c8cef3e637035581 Mon Sep 17 00:00:00 2001 From: spalger Date: Tue, 28 Apr 2020 11:50:05 -0700 Subject: [PATCH 1/3] typescript-ify portions of src/optimize --- src/legacy/server/kbn_server.d.ts | 1 + .../{bundles_route.js => bundles_route.ts} | 23 ++++- ..._response.js => dynamic_asset_response.ts} | 49 ++++++---- .../{file_hash.js => file_hash.ts} | 25 +++-- src/optimize/bundles_route/file_hash_cache.ts | 36 +++++++ .../bundles_route/{index.js => index.ts} | 0 ...undles_route.js => proxy_bundles_route.ts} | 4 +- src/optimize/index.js | 71 +------------- ...ic_dirs.js => np_ui_plugin_public_dirs.ts} | 19 ++-- src/optimize/optimize_mixin.ts | 98 +++++++++++++++++++ ...ceholder.js => public_path_placeholder.ts} | 26 +++-- 11 files changed, 230 insertions(+), 122 deletions(-) rename src/optimize/bundles_route/{bundles_route.js => bundles_route.ts} (89%) rename src/optimize/bundles_route/{dynamic_asset_response.js => dynamic_asset_response.ts} (77%) rename src/optimize/bundles_route/{file_hash.js => file_hash.ts} (72%) create mode 100644 src/optimize/bundles_route/file_hash_cache.ts rename src/optimize/bundles_route/{index.js => index.ts} (100%) rename src/optimize/bundles_route/{proxy_bundles_route.js => proxy_bundles_route.ts} (87%) rename src/optimize/{np_ui_plugin_public_dirs.js => np_ui_plugin_public_dirs.ts} (73%) create mode 100644 src/optimize/optimize_mixin.ts rename src/optimize/{public_path_placeholder.js => public_path_placeholder.ts} (76%) diff --git a/src/legacy/server/kbn_server.d.ts b/src/legacy/server/kbn_server.d.ts index 0d2f3528c9019..02491ff872981 100644 --- a/src/legacy/server/kbn_server.d.ts +++ b/src/legacy/server/kbn_server.d.ts @@ -153,6 +153,7 @@ export default class KbnServer { public server: Server; public inject: Server['inject']; public pluginSpecs: any[]; + public uiBundles: any; constructor( settings: Record, diff --git a/src/optimize/bundles_route/bundles_route.js b/src/optimize/bundles_route/bundles_route.ts similarity index 89% rename from src/optimize/bundles_route/bundles_route.js rename to src/optimize/bundles_route/bundles_route.ts index 4030988c8552c..5605ec5338969 100644 --- a/src/optimize/bundles_route/bundles_route.js +++ b/src/optimize/bundles_route/bundles_route.ts @@ -18,10 +18,13 @@ */ import { isAbsolute, extname, join } from 'path'; -import LruCache from 'lru-cache'; + +import Hapi from 'hapi'; import * as UiSharedDeps from '@kbn/ui-shared-deps'; + import { createDynamicAssetResponse } from './dynamic_asset_response'; -import { assertIsNpUiPluginPublicDirs } from '../np_ui_plugin_public_dirs'; +import { FileHashCache } from './file_hash_cache'; +import { assertIsNpUiPluginPublicDirs, NpUiPluginPublicDirs } from '../np_ui_plugin_public_dirs'; import { fromRoot } from '../../core/server/utils'; /** @@ -44,11 +47,17 @@ export function createBundlesRoute({ basePublicPath, builtCssPath, npUiPluginPublicDirs = [], +}: { + regularBundlesPath: string; + dllBundlesPath: string; + basePublicPath: string; + builtCssPath: string; + npUiPluginPublicDirs?: NpUiPluginPublicDirs; }) { // rather than calculate the fileHash on every request, we // provide a cache object to `resolveDynamicAssetResponse()` that // will store the 100 most recently used hashes. - const fileHashCache = new LruCache(100); + const fileHashCache = new FileHashCache(); assertIsNpUiPluginPublicDirs(npUiPluginPublicDirs); if (typeof regularBundlesPath !== 'string' || !isAbsolute(regularBundlesPath)) { @@ -122,6 +131,12 @@ function buildRouteForBundles({ bundlesPath, fileHashCache, replacePublicPath = true, +}: { + publicPath: string; + routePath: string; + bundlesPath: string; + fileHashCache: FileHashCache; + replacePublicPath?: boolean; }) { return { method: 'GET', @@ -130,7 +145,7 @@ function buildRouteForBundles({ auth: false, ext: { onPreHandler: { - method(request, h) { + method(request: Hapi.Request, h: Hapi.ResponseToolkit) { const ext = extname(request.params.path); if (ext !== '.js' && ext !== '.css') { diff --git a/src/optimize/bundles_route/dynamic_asset_response.js b/src/optimize/bundles_route/dynamic_asset_response.ts similarity index 77% rename from src/optimize/bundles_route/dynamic_asset_response.js rename to src/optimize/bundles_route/dynamic_asset_response.ts index 80c49a26270fd..bebc062ee949d 100644 --- a/src/optimize/bundles_route/dynamic_asset_response.js +++ b/src/optimize/bundles_route/dynamic_asset_response.ts @@ -18,14 +18,20 @@ */ import { resolve } from 'path'; -import { open, fstat, createReadStream, close } from 'fs'; +import Fs from 'fs'; +import { promisify } from 'util'; import Boom from 'boom'; -import { fromNode as fcb } from 'bluebird'; +import Hapi from 'hapi'; +import { FileHashCache } from './file_hash_cache'; import { getFileHash } from './file_hash'; import { replacePlaceholder } from '../public_path_placeholder'; +const asyncOpen = promisify(Fs.open); +const asyncClose = promisify(Fs.close); +const asyncFstat = promisify(Fs.fstat); + /** * Create a Hapi response for the requested path. This is designed * to replicate a subset of the features provided by Hapi's Inert @@ -44,39 +50,46 @@ import { replacePlaceholder } from '../public_path_placeholder'; * - cached hash/etag is based on the file on disk, but modified * by the public path so that individual public paths have * different etags, but can share a cache - * - * @param {Object} options - * @property {Hapi.Request} options.request - * @property {string} options.bundlesPath - * @property {string} options.publicPath - * @property {LruCache} options.fileHashCache */ -export async function createDynamicAssetResponse(options) { - const { request, h, bundlesPath, publicPath, fileHashCache, replacePublicPath } = options; +export async function createDynamicAssetResponse({ + request, + h, + bundlesPath, + publicPath, + fileHashCache, + replacePublicPath, +}: { + request: Hapi.Request; + h: Hapi.ResponseToolkit; + bundlesPath: string; + publicPath: string; + fileHashCache: FileHashCache; + replacePublicPath: boolean; +}) { + let fd: number | undefined; - let fd; try { const path = resolve(bundlesPath, request.params.path); // prevent path traversal, only process paths that resolve within bundlesPath if (!path.startsWith(bundlesPath)) { - throw Boom.forbidden(null, 'EACCES'); + throw Boom.forbidden(undefined, 'EACCES'); } // we use and manage a file descriptor mostly because // that's what Inert does, and since we are accessing // the file 2 or 3 times per request it seems logical - fd = await fcb(cb => open(path, 'r', cb)); + fd = await asyncOpen(path, 'r'); - const stat = await fcb(cb => fstat(fd, cb)); + const stat = await asyncFstat(fd); const hash = await getFileHash(fileHashCache, path, stat, fd); - const read = createReadStream(null, { + const read = Fs.createReadStream(null as any, { fd, start: 0, autoClose: true, }); - fd = null; // read stream is now responsible for fd + fd = undefined; // read stream is now responsible for fd const content = replacePublicPath ? replacePlaceholder(read, publicPath) : read; const etag = replacePublicPath ? `${hash}-${publicPath}` : hash; @@ -91,8 +104,8 @@ export async function createDynamicAssetResponse(options) { } catch (error) { if (fd) { try { - await fcb(cb => close(fd, cb)); - } catch (error) { + await asyncClose(fd); + } catch (_) { // ignore errors from close, we already have one to report // and it's very likely they are the same } diff --git a/src/optimize/bundles_route/file_hash.js b/src/optimize/bundles_route/file_hash.ts similarity index 72% rename from src/optimize/bundles_route/file_hash.js rename to src/optimize/bundles_route/file_hash.ts index d9464cf05eca1..a71c31ad70d42 100644 --- a/src/optimize/bundles_route/file_hash.js +++ b/src/optimize/bundles_route/file_hash.ts @@ -18,20 +18,17 @@ */ import { createHash } from 'crypto'; -import { createReadStream } from 'fs'; +import Fs from 'fs'; import * as Rx from 'rxjs'; -import { merge, mergeMap, takeUntil } from 'rxjs/operators'; +import { takeUntil } from 'rxjs/operators'; + +import { FileHashCache } from './file_hash_cache'; /** * Get the hash of a file via a file descriptor - * @param {LruCache} cache - * @param {string} path - * @param {Fs.Stat} stat - * @param {Fs.FileDescriptor} fd - * @return {Promise} */ -export async function getFileHash(cache, path, stat, fd) { +export async function getFileHash(cache: FileHashCache, path: string, stat: Fs.Stats, fd: number) { const key = `${path}:${stat.ino}:${stat.size}:${stat.mtime.getTime()}`; const cached = cache.get(key); @@ -40,17 +37,17 @@ export async function getFileHash(cache, path, stat, fd) { } const hash = createHash('sha1'); - const read = createReadStream(null, { + const read = Fs.createReadStream(null as any, { fd, start: 0, autoClose: false, }); - const promise = Rx.fromEvent(read, 'data') - .pipe( - merge(Rx.fromEvent(read, 'error').pipe(mergeMap(Rx.throwError))), - takeUntil(Rx.fromEvent(read, 'end')) - ) + const promise = Rx.merge( + Rx.fromEvent(read, 'data'), + Rx.fromEvent(read, 'error').pipe(Rx.throwError) + ) + .pipe(takeUntil(Rx.fromEvent(read, 'end'))) .forEach(chunk => hash.update(chunk)) .then(() => hash.digest('hex')) .catch(error => { diff --git a/src/optimize/bundles_route/file_hash_cache.ts b/src/optimize/bundles_route/file_hash_cache.ts new file mode 100644 index 0000000000000..a7cdabbff13a7 --- /dev/null +++ b/src/optimize/bundles_route/file_hash_cache.ts @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import LruCache from 'lru-cache'; + +export class FileHashCache { + private lru = new LruCache>(100); + + get(key: string) { + return this.lru.get(key); + } + + set(key: string, value: Promise) { + this.lru.set(key, value); + } + + del(key: string) { + this.lru.del(key); + } +} diff --git a/src/optimize/bundles_route/index.js b/src/optimize/bundles_route/index.ts similarity index 100% rename from src/optimize/bundles_route/index.js rename to src/optimize/bundles_route/index.ts diff --git a/src/optimize/bundles_route/proxy_bundles_route.js b/src/optimize/bundles_route/proxy_bundles_route.ts similarity index 87% rename from src/optimize/bundles_route/proxy_bundles_route.js rename to src/optimize/bundles_route/proxy_bundles_route.ts index fff0ec444d95b..97616f7041f1c 100644 --- a/src/optimize/bundles_route/proxy_bundles_route.js +++ b/src/optimize/bundles_route/proxy_bundles_route.ts @@ -17,7 +17,7 @@ * under the License. */ -export function createProxyBundlesRoute({ host, port }) { +export function createProxyBundlesRoute({ host, port }: { host: string; port: number }) { return [ buildProxyRouteForBundles('/bundles/', host, port), buildProxyRouteForBundles('/built_assets/dlls/', host, port), @@ -25,7 +25,7 @@ export function createProxyBundlesRoute({ host, port }) { ]; } -function buildProxyRouteForBundles(routePath, host, port) { +function buildProxyRouteForBundles(routePath: string, host: string, port: number) { return { path: `${routePath}{path*}`, method: 'GET', diff --git a/src/optimize/index.js b/src/optimize/index.js index b7b9f7712358a..363f81a6a3a96 100644 --- a/src/optimize/index.js +++ b/src/optimize/index.js @@ -17,72 +17,5 @@ * under the License. */ -import FsOptimizer from './fs_optimizer'; -import { createBundlesRoute } from './bundles_route'; -import { DllCompiler } from './dynamic_dll_plugin'; -import { fromRoot } from '../core/server/utils'; -import { getNpUiPluginPublicDirs } from './np_ui_plugin_public_dirs'; - -export default async (kbnServer, server, config) => { - if (!config.get('optimize.enabled')) return; - - // the watch optimizer sets up two threads, one is the server listening - // on 5601 and the other is a server listening on 5602 that builds the - // bundles in a "middleware" style. - // - // the server listening on 5601 may be restarted a number of times, depending - // on the watch setup managed by the cli. It proxies all bundles/* and built_assets/dlls/* - // requests to the other server. The server on 5602 is long running, in order - // to prevent complete rebuilds of the optimize content. - const watch = config.get('optimize.watch'); - if (watch) { - return await kbnServer.mixin(require('./watch/watch')); - } - - const { uiBundles } = kbnServer; - server.route( - createBundlesRoute({ - regularBundlesPath: uiBundles.getWorkingDir(), - dllBundlesPath: DllCompiler.getRawDllConfig().outputPath, - basePublicPath: config.get('server.basePath'), - builtCssPath: fromRoot('built_assets/css'), - npUiPluginPublicDirs: getNpUiPluginPublicDirs(kbnServer), - }) - ); - - // in prod, only bundle when something is missing or invalid - const reuseCache = config.get('optimize.useBundleCache') - ? await uiBundles.areAllBundleCachesValid() - : false; - - // we might not have any work to do - if (reuseCache) { - server.log(['debug', 'optimize'], `All bundles are cached and ready to go!`); - return; - } - - await uiBundles.resetBundleDir(); - - // only require the FsOptimizer when we need to - const optimizer = new FsOptimizer({ - logWithMetadata: (tags, message, metadata) => server.logWithMetadata(tags, message, metadata), - uiBundles, - profile: config.get('optimize.profile'), - sourceMaps: config.get('optimize.sourceMaps'), - workers: config.get('optimize.workers'), - }); - - server.log( - ['info', 'optimize'], - `Optimizing and caching ${uiBundles.getDescription()}. This may take a few minutes` - ); - - const start = Date.now(); - await optimizer.run(); - const seconds = ((Date.now() - start) / 1000).toFixed(2); - - server.log( - ['info', 'optimize'], - `Optimization of ${uiBundles.getDescription()} complete in ${seconds} seconds` - ); -}; +import { optimizeMixin } from './optimize_mixin'; +export default optimizeMixin; diff --git a/src/optimize/np_ui_plugin_public_dirs.js b/src/optimize/np_ui_plugin_public_dirs.ts similarity index 73% rename from src/optimize/np_ui_plugin_public_dirs.js rename to src/optimize/np_ui_plugin_public_dirs.ts index de05fd2b863b8..268d9b1485dd8 100644 --- a/src/optimize/np_ui_plugin_public_dirs.js +++ b/src/optimize/np_ui_plugin_public_dirs.ts @@ -17,7 +17,14 @@ * under the License. */ -export function getNpUiPluginPublicDirs(kbnServer) { +import KbnServer from '../legacy/server/kbn_server'; + +export type NpUiPluginPublicDirs = Array<{ + id: string; + path: string; +}>; + +export function getNpUiPluginPublicDirs(kbnServer: KbnServer): NpUiPluginPublicDirs { return Array.from(kbnServer.newPlatform.__internals.uiPlugins.internal.entries()).map( ([id, { publicTargetDir }]) => ({ id, @@ -26,17 +33,17 @@ export function getNpUiPluginPublicDirs(kbnServer) { ); } -export function isNpUiPluginPublicDirs(something) { +export function isNpUiPluginPublicDirs(x: any): x is NpUiPluginPublicDirs { return ( - Array.isArray(something) && - something.every( + Array.isArray(x) && + x.every( s => typeof s === 'object' && s && typeof s.id === 'string' && typeof s.path === 'string' ) ); } -export function assertIsNpUiPluginPublicDirs(something) { - if (!isNpUiPluginPublicDirs(something)) { +export function assertIsNpUiPluginPublicDirs(x: any): asserts x is NpUiPluginPublicDirs { + if (!isNpUiPluginPublicDirs(x)) { throw new TypeError( 'npUiPluginPublicDirs must be an array of objects with string `id` and `path` properties' ); diff --git a/src/optimize/optimize_mixin.ts b/src/optimize/optimize_mixin.ts new file mode 100644 index 0000000000000..5abe43ca1577a --- /dev/null +++ b/src/optimize/optimize_mixin.ts @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import Hapi from 'hapi'; + +// @ts-ignore not TS yet +import FsOptimizer from './fs_optimizer'; +import { createBundlesRoute } from './bundles_route'; +// @ts-ignore not TS yet +import { DllCompiler } from './dynamic_dll_plugin'; +import { fromRoot } from '../core/server/utils'; +import { getNpUiPluginPublicDirs } from './np_ui_plugin_public_dirs'; +import KbnServer, { KibanaConfig } from '../legacy/server/kbn_server'; + +export const optimizeMixin = async ( + kbnServer: KbnServer, + server: Hapi.Server, + config: KibanaConfig +) => { + if (!config.get('optimize.enabled')) return; + + // the watch optimizer sets up two threads, one is the server listening + // on 5601 and the other is a server listening on 5602 that builds the + // bundles in a "middleware" style. + // + // the server listening on 5601 may be restarted a number of times, depending + // on the watch setup managed by the cli. It proxies all bundles/* and built_assets/dlls/* + // requests to the other server. The server on 5602 is long running, in order + // to prevent complete rebuilds of the optimize content. + const watch = config.get('optimize.watch'); + if (watch) { + // eslint-disable-next-line @typescript-eslint/no-var-requires + return await kbnServer.mixin(require('./watch/watch')); + } + + const { uiBundles } = kbnServer; + server.route( + createBundlesRoute({ + regularBundlesPath: uiBundles.getWorkingDir(), + dllBundlesPath: DllCompiler.getRawDllConfig().outputPath, + basePublicPath: config.get('server.basePath'), + builtCssPath: fromRoot('built_assets/css'), + npUiPluginPublicDirs: getNpUiPluginPublicDirs(kbnServer), + }) + ); + + // in prod, only bundle when something is missing or invalid + const reuseCache = config.get('optimize.useBundleCache') + ? await uiBundles.areAllBundleCachesValid() + : false; + + // we might not have any work to do + if (reuseCache) { + server.log(['debug', 'optimize'], `All bundles are cached and ready to go!`); + return; + } + + await uiBundles.resetBundleDir(); + + // only require the FsOptimizer when we need to + const optimizer = new FsOptimizer({ + logWithMetadata: server.logWithMetadata, + uiBundles, + profile: config.get('optimize.profile'), + sourceMaps: config.get('optimize.sourceMaps'), + workers: config.get('optimize.workers'), + }); + + server.log( + ['info', 'optimize'], + `Optimizing and caching ${uiBundles.getDescription()}. This may take a few minutes` + ); + + const start = Date.now(); + await optimizer.run(); + const seconds = ((Date.now() - start) / 1000).toFixed(2); + + server.log( + ['info', 'optimize'], + `Optimization of ${uiBundles.getDescription()} complete in ${seconds} seconds` + ); +}; diff --git a/src/optimize/public_path_placeholder.js b/src/optimize/public_path_placeholder.ts similarity index 76% rename from src/optimize/public_path_placeholder.js rename to src/optimize/public_path_placeholder.ts index ef05d9e5ae704..1ec2b4a431aa6 100644 --- a/src/optimize/public_path_placeholder.js +++ b/src/optimize/public_path_placeholder.ts @@ -17,14 +17,20 @@ * under the License. */ -import { createReplaceStream } from '../legacy/utils'; +import Stream from 'stream'; +import Fs from 'fs'; import * as Rx from 'rxjs'; import { take, takeUntil } from 'rxjs/operators'; +import { createReplaceStream } from '../legacy/utils'; export const PUBLIC_PATH_PLACEHOLDER = '__REPLACE_WITH_PUBLIC_PATH__'; -export function replacePlaceholder(read, replacement) { +interface ClosableTransform extends Stream.Transform { + close(): void; +} + +export function replacePlaceholder(read: Stream.Readable, replacement: string) { const replace = createReplaceStream(PUBLIC_PATH_PLACEHOLDER, replacement); // handle errors on the read stream by proxying them @@ -37,13 +43,15 @@ export function replacePlaceholder(read, replacement) { replace.end(); }); - replace.close = () => { - read.unpipe(); + const closableReplace: ClosableTransform = Object.assign(replace, { + close: () => { + read.unpipe(); - if (read.close) { - read.close(); - } - }; + if ('close' in read) { + (read as Fs.ReadStream).close(); + } + }, + }); - return read.pipe(replace); + return read.pipe(closableReplace); } From 06c2a8c30c1156bdda22ed339d169cb67b7da647 Mon Sep 17 00:00:00 2001 From: spalger Date: Tue, 28 Apr 2020 12:31:46 -0700 Subject: [PATCH 2/3] Rx.throwError isn't an operator --- src/optimize/bundles_route/file_hash.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/optimize/bundles_route/file_hash.ts b/src/optimize/bundles_route/file_hash.ts index a71c31ad70d42..e66e35a7c497e 100644 --- a/src/optimize/bundles_route/file_hash.ts +++ b/src/optimize/bundles_route/file_hash.ts @@ -21,7 +21,7 @@ import { createHash } from 'crypto'; import Fs from 'fs'; import * as Rx from 'rxjs'; -import { takeUntil } from 'rxjs/operators'; +import { takeUntil, mergeMap } from 'rxjs/operators'; import { FileHashCache } from './file_hash_cache'; @@ -45,7 +45,7 @@ export async function getFileHash(cache: FileHashCache, path: string, stat: Fs.S const promise = Rx.merge( Rx.fromEvent(read, 'data'), - Rx.fromEvent(read, 'error').pipe(Rx.throwError) + Rx.fromEvent(read, 'error').pipe(mergeMap(error => Rx.throwError(error))) ) .pipe(takeUntil(Rx.fromEvent(read, 'end'))) .forEach(chunk => hash.update(chunk)) From 96d3dc4f51c52f1e15971d9542361c5be75cc812 Mon Sep 17 00:00:00 2001 From: spalger Date: Tue, 28 Apr 2020 14:17:58 -0700 Subject: [PATCH 3/3] throwing an error can more straight forward --- src/optimize/bundles_route/file_hash.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/optimize/bundles_route/file_hash.ts b/src/optimize/bundles_route/file_hash.ts index e66e35a7c497e..7b0801098ed10 100644 --- a/src/optimize/bundles_route/file_hash.ts +++ b/src/optimize/bundles_route/file_hash.ts @@ -21,7 +21,7 @@ import { createHash } from 'crypto'; import Fs from 'fs'; import * as Rx from 'rxjs'; -import { takeUntil, mergeMap } from 'rxjs/operators'; +import { takeUntil, map } from 'rxjs/operators'; import { FileHashCache } from './file_hash_cache'; @@ -45,7 +45,11 @@ export async function getFileHash(cache: FileHashCache, path: string, stat: Fs.S const promise = Rx.merge( Rx.fromEvent(read, 'data'), - Rx.fromEvent(read, 'error').pipe(mergeMap(error => Rx.throwError(error))) + Rx.fromEvent(read, 'error').pipe( + map(error => { + throw error; + }) + ) ) .pipe(takeUntil(Rx.fromEvent(read, 'end'))) .forEach(chunk => hash.update(chunk))