Skip to content

Commit

Permalink
fix(kafka): add status code & batched value in response structure (#2684
Browse files Browse the repository at this point in the history
)

* fix: add status code & batched value in response structure

* fix: check for erroneous events

* chore: improve test coverage
  • Loading branch information
ujjwal-ab authored Oct 5, 2023
1 parent e37d693 commit 99f39f5
Show file tree
Hide file tree
Showing 4 changed files with 1,416 additions and 1,295 deletions.
11 changes: 9 additions & 2 deletions src/v0/destinations/kafka/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const {
getIntegrationsObj,
getHashFromArray,
removeUndefinedAndNullValues,
getSuccessRespEvents,
getErrorRespEvents,
} = require('../../util');
// const { InstrumentationError } = require("../../util/errorTypes");

Expand Down Expand Up @@ -37,6 +39,10 @@ const filterConfigTopics = (message, destination) => {

const batch = (destEvents) => {
const respList = [];
if (!Array.isArray(destEvents) || destEvents.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}

// Grouping the events by topic
const groupedEvents = groupBy(destEvents, (event) => event.message.topic);
Expand All @@ -52,9 +58,10 @@ const batch = (destEvents) => {
metadata: events.map((event) => event.metadata),
destination: events[0].destination,
};
respList.push(response);
respList.push(
getSuccessRespEvents(response.batchedRequest, response.metadata, response.destination, true),
);
}

return respList;
};

Expand Down
Loading

0 comments on commit 99f39f5

Please sign in to comment.