Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: changes for supporting record event in FB audience #3351

Merged
merged 17 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 277 additions & 0 deletions src/v0/destinations/fb_custom_audience/recordTransform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/* eslint-disable no-const-assign */
const lodash = require('lodash');
const get = require('get-value');
const {
InstrumentationError,
ConfigurationError,
getErrorRespEvents,
} = require('@rudderstack/integrations-lib');
const { schemaFields } = require('./config');
const { MappedToDestinationKey } = require('../../../constants');
const stats = require('../../../util/stats');
const {
getDestinationExternalIDInfoForRetl,
isDefinedAndNotNullAndNotEmpty,
checkSubsetOfArray,
returnArrayOfSubarrays,
getSuccessRespEvents,
generateErrorObject,
} = require('../../util');
const {
ensureApplicableFormat,
getUpdatedDataElement,
getSchemaForEventMappedToDest,
batchingWithPayloadSize,
responseBuilderSimple,
getDataSource,
} = require('./util');

function getErrorMetaData(inputs, acceptedOperations) {
const metadata = [];
// eslint-disable-next-line no-restricted-syntax
for (const key in inputs) {
if (!acceptedOperations.includes(key)) {
inputs[key].forEach((input) => {
metadata.push(input.metadata);
});
}
}
return metadata;
}

const processRecordEventArray = (
recordChunksArray,
userSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
operation,
operationAudienceId,
) => {
const toSendEvents = [];
const metadata = [];
recordChunksArray.forEach((recordArray) => {
const data = [];
recordArray.forEach((input) => {
const { fields } = input.message;
let dataElement = [];
let nullUserData = true;

userSchema.forEach((eachProperty) => {
const userProperty = fields[eachProperty];
let updatedProperty = userProperty;

if (isHashRequired && !disableFormat) {
updatedProperty = ensureApplicableFormat(eachProperty, userProperty);
}

dataElement = getUpdatedDataElement(
dataElement,
isHashRequired,
eachProperty,
updatedProperty,
);

if (dataElement[dataElement.length - 1]) {
nullUserData = false;
}
});

if (nullUserData) {
stats.increment('fb_custom_audience_event_having_all_null_field_values_for_a_user', {

Check warning on line 83 in src/v0/destinations/fb_custom_audience/recordTransform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/fb_custom_audience/recordTransform.js#L83

Added line #L83 was not covered by tests
destinationId: destination.ID,
nullFields: userSchema,
});
}
data.push(dataElement);
metadata.push(input.metadata);
});

const prepareFinalPayload = lodash.cloneDeep(paramsPayload);
prepareFinalPayload.schema = userSchema;
prepareFinalPayload.data = data;
const payloadBatches = batchingWithPayloadSize(prepareFinalPayload);
krishna2020 marked this conversation as resolved.
Show resolved Hide resolved

payloadBatches.forEach((payloadBatch) => {
const response = {
...prepareParams,
payload: payloadBatch,
};

const wrappedResponse = {
responseField: response,
operationCategory: operation,
};

const builtResponse = responseBuilderSimple(wrappedResponse, operationAudienceId);

toSendEvents.push(builtResponse);
});
});

const response = getSuccessRespEvents(toSendEvents, metadata, destination, true);

return response;
};

async function processRecordInputs(groupedRecordInputs) {
const { destination } = groupedRecordInputs[0];
const { message } = groupedRecordInputs[0];
const {
isHashRequired,
accessToken,
disableFormat,
type,
subType,
isRaw,
maxUserCount,
audienceId,
} = destination.Config;
const prepareParams = {
access_token: accessToken,
};

// maxUserCount validation
const maxUserCountNumber = parseInt(maxUserCount, 10);
if (Number.isNaN(maxUserCountNumber)) {
throw new ConfigurationError('Batch size must be an Integer.');

Check warning on line 139 in src/v0/destinations/fb_custom_audience/recordTransform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/fb_custom_audience/recordTransform.js#L139

Added line #L139 was not covered by tests
}

// audience id validation
let operationAudienceId = audienceId;
const mappedToDestination = get(message, MappedToDestinationKey);
if (!operationAudienceId && mappedToDestination) {
krishna2020 marked this conversation as resolved.
Show resolved Hide resolved
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE');
operationAudienceId = objectType;
}
if (!isDefinedAndNotNullAndNotEmpty(operationAudienceId)) {
throw new ConfigurationError('Audience ID is a mandatory field');

Check warning on line 150 in src/v0/destinations/fb_custom_audience/recordTransform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/fb_custom_audience/recordTransform.js#L150

Added line #L150 was not covered by tests
}

// user schema validation
let { userSchema } = destination.Config;
if (mappedToDestination) {
userSchema = getSchemaForEventMappedToDest(message);
krishna2020 marked this conversation as resolved.
Show resolved Hide resolved
}
if (!Array.isArray(userSchema)) {
userSchema = [userSchema];

Check warning on line 159 in src/v0/destinations/fb_custom_audience/recordTransform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/fb_custom_audience/recordTransform.js#L159

Added line #L159 was not covered by tests
}
if (!checkSubsetOfArray(schemaFields, userSchema)) {
throw new ConfigurationError('One or more of the schema fields are not supported');

Check warning on line 162 in src/v0/destinations/fb_custom_audience/recordTransform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/fb_custom_audience/recordTransform.js#L162

Added line #L162 was not covered by tests
}

const paramsPayload = {};

if (isRaw) {
paramsPayload.is_raw = isRaw;

Check warning on line 168 in src/v0/destinations/fb_custom_audience/recordTransform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/fb_custom_audience/recordTransform.js#L168

Added line #L168 was not covered by tests
}

const dataSource = getDataSource(type, subType);
if (Object.keys(dataSource).length > 0) {
paramsPayload.data_source = dataSource;

Check warning on line 173 in src/v0/destinations/fb_custom_audience/recordTransform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/fb_custom_audience/recordTransform.js#L173

Added line #L173 was not covered by tests
}

const groupedRecordsByAction = lodash.groupBy(groupedRecordInputs, (record) =>
record.message.action?.toLowerCase(),
);

const finalResponse = [];

let insertResponse;
let deleteResponse;
let updateResponse;

if (groupedRecordsByAction.delete) {
const deleteRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.delete,
maxUserCountNumber,
);
deleteResponse = processRecordEventArray(
deleteRecordChunksArray,
userSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'remove',
operationAudienceId,
);
}

if (groupedRecordsByAction.insert) {
const insertRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.insert,
maxUserCountNumber,
);

insertResponse = processRecordEventArray(
insertRecordChunksArray,
userSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'add',
operationAudienceId,
);
}

if (groupedRecordsByAction.update) {
const updateRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.update,
maxUserCountNumber,
);
updateResponse = processRecordEventArray(
updateRecordChunksArray,
userSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'add',
operationAudienceId,
);
}

const eventTypes = ['update', 'insert', 'delete'];
const errorMetaData = [];
const errorMetaDataObject = getErrorMetaData(groupedRecordsByAction, eventTypes);
if (errorMetaDataObject.length > 0) {
errorMetaData.push(errorMetaDataObject);
}

const error = new InstrumentationError('Invalid action type in record event');
const errorObj = generateErrorObject(error);
const errorResponseList = errorMetaData.map((metadata) =>
getErrorRespEvents(metadata, errorObj.status, errorObj.message, errorObj.statTags),
);

if (deleteResponse && deleteResponse.batchedRequest.length > 0) {
finalResponse.push(deleteResponse);
}
if (insertResponse && insertResponse.batchedRequest.length > 0) {
finalResponse.push(insertResponse);
}
if (updateResponse && updateResponse.batchedRequest.length > 0) {
finalResponse.push(updateResponse);
}
if (errorResponseList.length > 0) {
finalResponse.push(...errorResponseList);
}

if (finalResponse.length === 0) {
throw new InstrumentationError(

Check warning on line 268 in src/v0/destinations/fb_custom_audience/recordTransform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/fb_custom_audience/recordTransform.js#L268

Added line #L268 was not covered by tests
'Missing valid parameters, unable to generate transformed payload',
);
}
return finalResponse;
}

module.exports = {
processRecordInputs,
};
Loading
Loading