Skip to content

Commit

Permalink
fix(mixpanel): batch event ordering (#2608)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gauravudia authored and Sanjay-Veernala committed Sep 19, 2023
1 parent 39c3e24 commit 3a79023
Show file tree
Hide file tree
Showing 10 changed files with 1,147 additions and 282 deletions.
2 changes: 1 addition & 1 deletion src/v0/destinations/iterable/transform.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const _ = require('lodash');
const get = require('get-value');
const {
batchEvents,
getCatalogEndpoint,
hasMultipleResponses,
pageEventPayloadBuilder,
Expand All @@ -23,6 +22,7 @@ const {
handleRtTfSingleEventError,
removeUndefinedAndNullValues,
getDestinationExternalIDInfoForRetl,
groupEventsByType: batchEvents,
} = require('../../util');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { mappingConfig, ConfigCategory } = require('./config');
Expand Down
34 changes: 0 additions & 34 deletions src/v0/destinations/iterable/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -736,41 +736,7 @@ const filterEventsAndPrepareBatchRequests = (transformedEvents) => {
return prepareBatchRequests(filteredEvents);
};

/**
* Groups events with the same message type together in batches.
* Each batch contains events that have the same message type and are from different users.
*
* @param {Array} inputs - An array of events
* @returns {Array} - An array of batches
*/
const batchEvents = (inputs) => {
const batches = [];
let currentInputsArray = inputs;
while (currentInputsArray.length > 0) {
const remainingInputsArray = [];
const userOrderTracker = {};
const event = currentInputsArray.shift();
const messageType = event.message.type;
const batch = [event];
currentInputsArray.forEach((currentInput) => {
const currentMessageType = currentInput.message.type;
const currentUser = currentInput.metadata.userId;
if (currentMessageType === messageType && !userOrderTracker[currentUser]) {
batch.push(currentInput);
} else {
remainingInputsArray.push(currentInput);
userOrderTracker[currentUser] = true;
}
});
batches.push(batch);
currentInputsArray = remainingInputsArray;
}

return batches;
};

module.exports = {
batchEvents,
getCatalogEndpoint,
hasMultipleResponses,
pageEventPayloadBuilder,
Expand Down
98 changes: 0 additions & 98 deletions src/v0/destinations/iterable/util.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const {
batchEvents,
pageEventPayloadBuilder,
trackEventPayloadBuilder,
screenEventPayloadBuilder,
Expand Down Expand Up @@ -108,103 +107,6 @@ const getTestEcommMessage = () => {
return message;
};

const orderEventsFuncData = [
{
inputs: [
{ message: { type: 'identify' }, metadata: { userId: '1' } },
{ message: { type: 'track' }, metadata: { userId: '1' } },
{ message: { type: 'identify' }, metadata: { userId: '2' } },
{ message: { type: 'track' }, metadata: { userId: '2' } },
{ message: { type: 'track' }, metadata: { userId: '2' } },
{ message: { type: 'identify' }, metadata: { userId: '3' } },
{ message: { type: 'track' }, metadata: { userId: '3' } },
{ message: { type: 'identify' }, metadata: { userId: '4' } },
{ message: { type: 'track' }, metadata: { userId: '4' } },
{ message: { type: 'identify' }, metadata: { userId: '5' } },
{ message: { type: 'track' }, metadata: { userId: '5' } },
{ message: { type: 'identify' }, metadata: { userId: '5' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
],
output: [
[
{ message: { type: 'identify' }, metadata: { userId: '1' } },
{ message: { type: 'identify' }, metadata: { userId: '2' } },
{ message: { type: 'identify' }, metadata: { userId: '3' } },
{ message: { type: 'identify' }, metadata: { userId: '4' } },
{ message: { type: 'identify' }, metadata: { userId: '5' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
],
[
{ message: { type: 'track' }, metadata: { userId: '1' } },
{ message: { type: 'track' }, metadata: { userId: '2' } },
{ message: { type: 'track' }, metadata: { userId: '2' } },
{ message: { type: 'track' }, metadata: { userId: '3' } },
{ message: { type: 'track' }, metadata: { userId: '4' } },
{ message: { type: 'track' }, metadata: { userId: '5' } },
],
[
{ message: { type: 'identify' }, metadata: { userId: '5' } },
]
]
},
{
inputs: [
{ message: { type: 'track' }, metadata: { userId: '3' } },
{ message: { type: 'identify' }, metadata: { userId: '3' } },
{ message: { type: 'identify' }, metadata: { userId: '1' } },
{ message: { type: 'track' }, metadata: { userId: '1' } },
{ message: { type: 'identify' }, metadata: { userId: '5' } },
{ message: { type: 'track' }, metadata: { userId: '5' } },
{ message: { type: 'identify' }, metadata: { userId: '5' } },
{ message: { type: 'identify' }, metadata: { userId: '2' } },
{ message: { type: 'track' }, metadata: { userId: '2' } },
{ message: { type: 'track' }, metadata: { userId: '2' } },
{ message: { type: 'track' }, metadata: { userId: '4' } },
{ message: { type: 'identify' }, metadata: { userId: '4' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
],
output: [
[
{ message: { type: 'track' }, metadata: { userId: '3' } },
{ message: { type: 'track' }, metadata: { userId: '4' } },
],
[
{ message: { type: 'identify' }, metadata: { userId: '3' } },
{ message: { type: 'identify' }, metadata: { userId: '1' } },
{ message: { type: 'identify' }, metadata: { userId: '5' } },
{ message: { type: 'identify' }, metadata: { userId: '2' } },
{ message: { type: 'identify' }, metadata: { userId: '4' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
{ message: { type: 'identify' }, metadata: { userId: '6' } },
],
[
{ message: { type: 'track' }, metadata: { userId: '1' } },
{ message: { type: 'track' }, metadata: { userId: '5' } },
{ message: { type: 'track' }, metadata: { userId: '2' } },
{ message: { type: 'track' }, metadata: { userId: '2' } },
],
[
{ message: { type: 'identify' }, metadata: { userId: '5' } },
]
]
}
]

describe('Order Event Tests', () => {
it('Order Event func tests', () => {
orderEventsFuncData.forEach((data) => {
expect(batchEvents(data.inputs)).toEqual(data.output);
})
});
});

describe('iterable utils test', () => {
describe('Unit test cases for iterable registerDeviceTokenEventPayloadBuilder', () => {
it('for no device type', async () => {
Expand Down
172 changes: 56 additions & 116 deletions src/v0/destinations/mp/transform.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const _ = require('lodash');
const get = require('get-value');
const { EventType } = require('../../../constants');
const {
Expand All @@ -14,9 +15,7 @@ const {
getFieldValueFromMessage,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
batchMultiplexedEvents,
getSuccessRespEvents,
defaultBatchRequestConfig,
groupEventsByType,
} = require('../../util');
const {
ConfigCategory,
Expand All @@ -32,6 +31,8 @@ const {
createIdentifyResponse,
isImportAuthCredentialsAvailable,
combineBatchRequestsWithSameJobIds,
groupEventsByEndpoint,
batchEvents,
} = require('./util');
const { InstrumentationError, ConfigurationError } = require('../../util/errorTypes');
const { CommonUtils } = require('../../../util/common');
Expand Down Expand Up @@ -378,29 +379,7 @@ const processGroupEvents = (message, type, destination) => {
return returnValue;
};

const generateBatchedPayloadForArray = (events) => {
const { batchedRequest } = defaultBatchRequestConfig();
const batchResponseList = events.flatMap((event) => JSON.parse(event.body.JSON_ARRAY.batch));
batchedRequest.body.JSON_ARRAY = { batch: JSON.stringify(batchResponseList) };
batchedRequest.endpoint = events[0].endpoint;
batchedRequest.headers = events[0].headers;
batchedRequest.params = events[0].params;
return batchedRequest;
};

const batchEvents = (successRespList, maxBatchSize) => {
const batchResponseList = [];
const batchedEvents = batchMultiplexedEvents(successRespList, maxBatchSize);
batchedEvents.forEach((batch) => {
const batchedRequest = generateBatchedPayloadForArray(batch.events);
batchResponseList.push(
getSuccessRespEvents(batchedRequest, batch.metadata, batch.destination, true),
);
});
return batchResponseList;
};

const processSingleMessage = async (message, destination) => {
const processSingleMessage = (message, destination) => {
const clonedMessage = { ...message };
if (clonedMessage.userId) {
clonedMessage.userId = String(clonedMessage.userId);
Expand Down Expand Up @@ -431,74 +410,7 @@ const processSingleMessage = async (message, destination) => {
}
};

const process = async (event) => processSingleMessage(event.message, event.destination);

const processEvents = async (inputs, reqMetadata) =>
await Promise.all(
inputs.map(async (event) => {
try {
if (event.message.statusCode) {
// already transformed event
return { output: event };
}

// if not transformed
return {
output: {
message: await process(event),
metadata: event.metadata,
destination: event.destination,
},
};
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
return { error: errRespEvent };
}
}),
);

const processAndChunkEvents = async (inputs, reqMetadata) => {
const processedEvents = await processEvents(inputs, reqMetadata);
const engageEventChunks = [];
const groupsEventChunks = [];
const trackEventChunks = [];
const importEventChunks = [];
const batchErrorRespList = [];
processedEvents.forEach((result) => {
if (result.output) {
const event = result.output;
const { destination, metadata } = event;
let { message } = event;
message = CommonUtils.toArray(message);
message.forEach((msg) => {
// eslint-disable-next-line default-case
switch (true) {
case msg.endpoint.includes('engage'):
engageEventChunks.push({ message: msg, destination, metadata });
break;
case msg.endpoint.includes('groups'):
groupsEventChunks.push({ message: msg, destination, metadata });
break;
case msg.endpoint.includes('track'):
trackEventChunks.push({ message: msg, destination, metadata });
break;
case msg.endpoint.includes('import'):
importEventChunks.push({ message: msg, destination, metadata });
break;
}
});
} else if (result.error) {
batchErrorRespList.push(result.error);
}
});
return {
engageEventChunks,
groupsEventChunks,
trackEventChunks,
importEventChunks,
batchErrorRespList,
};
};
const process = (event) => processSingleMessage(event.message, event.destination);

// Documentation about how Mixpanel handles the utm parameters
// Ref: https://help.mixpanel.com/hc/en-us/articles/115004613766-Default-Properties-Collected-by-Mixpanel
Expand All @@ -509,28 +421,56 @@ const processRouterDest = async (inputs, reqMetadata) => {
return errorRespEvents;
}

const {
engageEventChunks,
groupsEventChunks,
trackEventChunks,
importEventChunks,
batchErrorRespList,
} = await processAndChunkEvents(inputs, reqMetadata);

const engageRespList = batchEvents(engageEventChunks, ENGAGE_MAX_BATCH_SIZE);
const groupsRespList = batchEvents(groupsEventChunks, GROUPS_MAX_BATCH_SIZE);
const trackRespList = batchEvents(trackEventChunks, TRACK_MAX_BATCH_SIZE);
const importRespList = batchEvents(importEventChunks, IMPORT_MAX_BATCH_SIZE);

let batchSuccessRespList = [
...engageRespList,
...groupsRespList,
...trackRespList,
...importRespList,
];
batchSuccessRespList = combineBatchRequestsWithSameJobIds(batchSuccessRespList);

return [...batchSuccessRespList, ...batchErrorRespList];
const groupedEvents = groupEventsByType(inputs);
const response = await Promise.all(
groupedEvents.map(async (listOfEvents) => {
let transformedPayloads = await Promise.all(
listOfEvents.map(async (event) => {
try {
if (event.message.statusCode) {
// already transformed event
return {
message: event.message,
metadata: event.metadata,
destination: event.destination,
};
}

let processedEvents = await process(event);
processedEvents = CommonUtils.toArray(processedEvents);
return processedEvents.map((response) => ({
message: response,
metadata: event.metadata,
destination: event.destination,
}));
} catch (error) {
return handleRtTfSingleEventError(event, error, reqMetadata);
}
}),
);

transformedPayloads = _.flatMap(transformedPayloads);
const { engageEvents, groupsEvents, trackEvents, importEvents, batchErrorRespList } =
groupEventsByEndpoint(transformedPayloads);

const engageRespList = batchEvents(engageEvents, ENGAGE_MAX_BATCH_SIZE);
const groupsRespList = batchEvents(groupsEvents, GROUPS_MAX_BATCH_SIZE);
const trackRespList = batchEvents(trackEvents, TRACK_MAX_BATCH_SIZE);
const importRespList = batchEvents(importEvents, IMPORT_MAX_BATCH_SIZE);
const batchSuccessRespList = [
...engageRespList,
...groupsRespList,
...trackRespList,
...importRespList,
];

return [...batchSuccessRespList, ...batchErrorRespList];
}),
);

// Flatten the response array containing batched events from multiple groups
const allBatchedEvents = _.flatMap(response);
return combineBatchRequestsWithSameJobIds(allBatchedEvents);
};

module.exports = { process, processRouterDest };
Loading

0 comments on commit 3a79023

Please sign in to comment.