From 337d3b62de9eed9655095bd63d79893cb5867006 Mon Sep 17 00:00:00 2001 From: Santosh Shrestha Date: Mon, 3 Jun 2024 17:10:18 -0500 Subject: [PATCH] handle stream error --- src/index.test.ts | 28 +++++++++++++++++++++++++++- src/index.ts | 8 +++++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/index.test.ts b/src/index.test.ts index 0a728db..53d346a 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -7,6 +7,7 @@ import * as mockSiteModel from './test-helpers/mock-site-model.json'; import { createMockKoopApp } from './test-helpers/create-mock-koop-app'; import { readableFromArray } from './test-helpers/stream-utils'; import { DcatUsError } from './dcat-us/dcat-us-error'; +import { PassThrough } from 'stream'; function buildPluginAndApp(feedTemplate, feedTemplateTransforms) { let Output; @@ -21,9 +22,16 @@ function buildPluginAndApp(feedTemplate, feedTemplateTransforms) { }; const app = createMockKoopApp(); + app.get('/dcat', function (req, res, next) { req.app.locals.feedTemplateTransforms = feedTemplateTransforms; res.locals.feedTemplate = feedTemplate; + app.use((err, _req, res, _next) => { + res.status(err.status || 500) + res.send({ + error: err.message + }) + }) next(); }, plugin.serve.bind(plugin)); @@ -61,7 +69,6 @@ describe('Output Plugin', () => { mockFetchSite = mocked(fetchSite); mockFetchSite.mockResolvedValue(mockSiteModel); - [plugin, app] = buildPluginAndApp(dcatTemplate, {}); }); @@ -137,6 +144,25 @@ describe('Output Plugin', () => { // TODO test stream error }); + it('returns error if stream emits an error', async () => { + const mockReadable = new PassThrough(); + + plugin.model.pullStream.mockResolvedValue(mockReadable); + const mockError = new Error('stream error') + + setTimeout(() => { + mockReadable.emit('error', mockError) + }, 200) + await request(app) + .get('/dcat') + .set('host', siteHostName) + .expect('Content-Type', /application\/json/) + .expect(500) + .expect((res) => { + expect(res.body).toEqual({ error: 'stream error' }); + }); + }); + it('returns 400 when searchRequest returns 400', async () => { [plugin, app] = buildPluginAndApp({}, {}); diff --git a/src/index.ts b/src/index.ts index 769488c..75c4094 100644 --- a/src/index.ts +++ b/src/index.ts @@ -47,9 +47,11 @@ export = class OutputDcatUs11 { const { stream: dcatStream } = getDataStreamDcatUs11(feedTemplate, feedTemplateTransforms); const datasetStream = await this.getDatasetStream(req); - datasetStream - .pipe(dcatStream) - .pipe(res); + datasetStream.on('error', (err) => { + if (req.next) { + req.next(err) + } + }).pipe(dcatStream).pipe(res); } catch (err) { res.status(err.statusCode).send(this.getErrorResponse(err));