Skip to content

Commit

Permalink
handle error in pass through stream
Browse files Browse the repository at this point in the history
  • Loading branch information
cedrics committed May 8, 2023
1 parent 1daeddc commit a554efb
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 11 deletions.
14 changes: 11 additions & 3 deletions build/helper/concurrentLimitRequester.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ export function concurrentLimitRequesterFactory(parameters) {
var queueItem = requestQueue.shift();
outstandingRequests++;
var requestId = generateRequestId();
console.log("Starting request " + requestId + " (" + outstandingRequests + "/" + concurrentLimit + "): " + queueItem.request.query.queryType);
console.log("Starting request " + requestId + " (" + outstandingRequests + "/" + concurrentLimit + "): " + queueItem.request.query.queryType + " from queue");
var stream = requester(queueItem.request);
var requestFinishedOnce = getOnceCallback(function () {
console.log("Request finished " + requestId);
console.log("Request finished " + requestId + " from Queue");
requestFinished();
});
stream.on('error', requestFinishedOnce);
stream.on('end', requestFinishedOnce);
queueItem.stream.on('error', function (error) {
requestFinishedOnce(function () {
console.log("Error on PassThrough for request " + requestId);
stream.emit('error', error);
});
});
pipeWithError(stream, queueItem.stream);
}
return function (request) {
Expand Down Expand Up @@ -56,10 +62,12 @@ export function concurrentLimitRequesterFactory(parameters) {
}
function getOnceCallback(callback) {
var called = false;
return function () {
return function (optionalCallback) {
if (optionalCallback === void 0) { optionalCallback = function () { }; }
if (!called) {
called = true;
callback();
optionalCallback();
}
};
}
14 changes: 11 additions & 3 deletions build/plywood.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,20 @@ var concurrentLimitRequesterFactory = exports.concurrentLimitRequesterFactory =
var queueItem = requestQueue.shift();
outstandingRequests++;
var requestId = generateRequestId();
console.log("Starting request " + requestId + " (" + outstandingRequests + "/" + concurrentLimit + "): " + queueItem.request.query.queryType);
console.log("Starting request " + requestId + " (" + outstandingRequests + "/" + concurrentLimit + "): " + queueItem.request.query.queryType + " from queue");
var stream = requester(queueItem.request);
var requestFinishedOnce = getOnceCallback(function () {
console.log("Request finished " + requestId);
console.log("Request finished " + requestId + " from Queue");
requestFinished();
});
stream.on('error', requestFinishedOnce);
stream.on('end', requestFinishedOnce);
queueItem.stream.on('error', function (error) {
requestFinishedOnce(function () {
console.log("Error on PassThrough for request " + requestId);
stream.emit('error', error);
});
});
pipeWithError(stream, queueItem.stream);
}
return function (request) {
Expand Down Expand Up @@ -201,10 +207,12 @@ var concurrentLimitRequesterFactory = exports.concurrentLimitRequesterFactory =
}
function getOnceCallback(callback) {
var called = false;
return function () {
return function (optionalCallback) {
if (optionalCallback === void 0) { optionalCallback = function () { }; }
if (!called) {
called = true;
callback();
optionalCallback();
}
};
}
Expand Down
20 changes: 15 additions & 5 deletions src/helper/concurrentLimitRequester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,23 @@ export function concurrentLimitRequesterFactory<T>(parameters: ConcurrentLimitRe
let queueItem = requestQueue.shift();
outstandingRequests++;
const requestId = generateRequestId();
console.log(`Starting request ${requestId} (${outstandingRequests}/${concurrentLimit}): ${(queueItem.request.query as any).queryType}`);
console.log(`Starting request ${requestId} (${outstandingRequests}/${concurrentLimit}): ${(queueItem.request.query as any).queryType} from queue`);

const stream = requester(queueItem.request);

const requestFinishedOnce = getOnceCallback(() => {
console.log(`Request finished ${requestId}`);
console.log(`Request finished ${requestId} from Queue`);
requestFinished();
});
stream.on('close', requestFinishedOnce)
stream.on('error', requestFinishedOnce);
stream.on('end', requestFinishedOnce);

queueItem.stream.on('error', (error) => {
requestFinishedOnce(() => {
console.log(`Error on PassThrough for request ${requestId}`)
stream.emit('error', error);
})
});

pipeWithError(stream, queueItem.stream);
}
Expand All @@ -76,7 +84,8 @@ export function concurrentLimitRequesterFactory<T>(parameters: ConcurrentLimitRe
console.log(`Request finished ${requestId}`);
requestFinished();
});
stream.on('close', requestFinishedOnce)
stream.on('error', requestFinishedOnce);
stream.on('end', requestFinishedOnce);

return stream;
} else {
Expand All @@ -93,10 +102,11 @@ export function concurrentLimitRequesterFactory<T>(parameters: ConcurrentLimitRe
function getOnceCallback(callback: () => void) {
let called = false;

return () => {
return (optionalCallback = () => {}) => {
if (!called) {
called = true;
callback();
optionalCallback();
}
};
}

0 comments on commit a554efb

Please sign in to comment.