Skip to content

Commit

Permalink
feat(observerhandler): wrapper to observer to emit once only
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Sep 12, 2018
1 parent 9770a95 commit 35d5976
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 40 deletions.
17 changes: 9 additions & 8 deletions src/adapters/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import { isString } from 'util';
import { oxidVersion } from '../metadata';
import { OxidResponse } from '../Response';
import { isArrayBuffer, isStream } from '../utils/base';
import { completeObservable } from '../utils/completeObservable';
import { createError, enhanceError } from '../utils/createError';
import { getObserverHandler } from '../utils/getObserverHandler';
import { buildURL } from '../utils/urls';

const isHttps = /https:?/;
Expand All @@ -21,6 +21,7 @@ const httpadapter = (config: any) =>
//TODO: prevent to error/next/complete if any occurred (wrapper fn)
//TODO: enhance check around !
new Observable((observer: Observer<any>) => {
const { emitError, emitComplete } = getObserverHandler(observer);
let data = config.data;
let headers = config.headers;

Expand All @@ -38,7 +39,7 @@ const httpadapter = (config: any) =>
} else if (isString(data)) {
data = Buffer.from(data, 'utf-8');
} else {
observer.error(
emitError(
createError('Data after transformation must be a string, an ArrayBuffer, a Buffer, or a Stream', config)
);
}
Expand Down Expand Up @@ -205,15 +206,15 @@ const httpadapter = (config: any) =>

if (config.responseType === 'stream') {
response.data = stream;
completeObservable(observer, response);
emitComplete(response);
} else {
const responseBuffer: Array<any> = [];
stream.on('data', function handleStreamData(chunk) {
responseBuffer.push(chunk);

// make sure the content length is not over the maxContentLength if specified
if (config.maxContentLength > -1 && Buffer.concat(responseBuffer).length > config.maxContentLength) {
observer.error(
emitError(
createError(
'maxContentLength size of ' + config.maxContentLength + ' exceeded',
config,
Expand All @@ -228,7 +229,7 @@ const httpadapter = (config: any) =>
if (req.aborted) {
return;
}
observer.error(enhanceError(err, config, null, lastRequest));
emitError(enhanceError(err, config, null, lastRequest));
});

stream.on('end', function handleStreamEnd() {
Expand All @@ -238,7 +239,7 @@ const httpadapter = (config: any) =>
}

response.data = responseData;
completeObservable(observer, response);
emitComplete(response);
});
}
});
Expand All @@ -248,12 +249,12 @@ const httpadapter = (config: any) =>
if (req.aborted) {
return;
}
observer.error(enhanceError(err, config, null, req));
emitError(enhanceError(err, config, null, req));
});

// Send the request
if (isStream(data)) {
data.on('error', err => observer.error(enhanceError(err, config, null, req))).pipe(req);
data.on('error', err => emitError(enhanceError(err, config, null, req))).pipe(req);
} else {
req.end(data);
}
Expand Down
32 changes: 0 additions & 32 deletions src/utils/completeObservable.ts

This file was deleted.

48 changes: 48 additions & 0 deletions src/utils/getObserverHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { Observer } from 'rxjs';
import { OxidResponse } from '../Response';
import { createError } from './createError';

const getObserverHandler = (observer: Observer<any>) => {
let handled = false;
return {
emitError: (err: any) => {
if (handled) {
return;
}
handled = true;
observer.error(err);
},

/**
* Error or next response based on status. Once response emitted via next,
* it'll complete as well.
*
* @param observer
* @param response
*/
emitComplete: <T = any>(response: OxidResponse<T>) => {
if (handled) {
return;
}
handled = true;
const validateStatus = response.config.validateStatus;

if (!validateStatus || validateStatus(response.status)) {
observer.next(response);
observer.complete();
} else {
observer.error(
createError(
`Request failed with status code ${response.status}`,
response.config,
null,
response.request,
response
)
);
}
}
};
};

export { getObserverHandler };

0 comments on commit 35d5976

Please sign in to comment.