Skip to content

Commit

Permalink
feat(integrations): introduced new status codes to suppress or filter…
Browse files Browse the repository at this point in the history
… events (#2611)

* feat(integrations): introduced new actions to suppress or filter events

* feat(integrations): introduced new actions to suppress or filter events

* chore: klaviyo multiple response action fix

* chore: statuscode changes

* chore: introduce new statuscode for filter and suppress events

* chore: code review changes

* chore: added support of server<>transformer cross compatibility

* chore: code review changes

* chore: added new status code tests for braze

* chore: added new tests for both transformer versions - with status code and without status code

* chore: code review changes

* chore: code review changes

* chore: code review changes
  • Loading branch information
mihir-4116 authored Oct 9, 2023
1 parent bbf7ad4 commit 6bdb01e
Show file tree
Hide file tree
Showing 23 changed files with 1,927 additions and 1,263 deletions.
6 changes: 5 additions & 1 deletion src/routes/destination.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Router from '@koa/router';
import DestinationController from '../controllers/destination';
import RegulationController from '../controllers/regulation';
import FeatureFlagController from '../middlewares/featureFlag';
import RouteActivationController from '../middlewares/routeActivation';

const router = new Router();
Expand All @@ -9,22 +10,25 @@ router.post(
'/:version/destinations/:destination',
RouteActivationController.isDestinationRouteActive,
RouteActivationController.destinationProcFilter,
FeatureFlagController.handle,
DestinationController.destinationTransformAtProcessor,
);
router.post(
'/routerTransform',
RouteActivationController.isDestinationRouteActive,
RouteActivationController.destinationRtFilter,
FeatureFlagController.handle,
DestinationController.destinationTransformAtRouter,
);
router.post(
'/batch',
RouteActivationController.isDestinationRouteActive,
RouteActivationController.destinationBatchFilter,
FeatureFlagController.handle,
DestinationController.batchProcess,
);

router.post('/deleteUsers', RegulationController.deleteUsers);

const destinationRoutes = router.routes();
export default destinationRoutes;
export default destinationRoutes;
15 changes: 9 additions & 6 deletions src/services/destination/nativeIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ export default class NativeIntegrationDestinationService implements IntegrationD
events: ProcessorTransformationRequest[],
destinationType: string,
version: string,
_requestMetadata: NonNullable<unknown>,
requestMetadata: NonNullable<unknown>,
): Promise<ProcessorTransformationResponse[]> {
const destHandler = FetchHandler.getDestHandler(destinationType, version);
const respList: ProcessorTransformationResponse[][] = await Promise.all(
events.map(async (event) => {
try {
const transformedPayloads:
| ProcessorTransformationOutput
| ProcessorTransformationOutput[] = await destHandler.process(event);
| ProcessorTransformationOutput[] = await destHandler.process(event, requestMetadata);
return DestinationPostTransformationService.handleProcessorTransformSucessEvents(
event,
transformedPayloads,
Expand Down Expand Up @@ -88,7 +88,7 @@ export default class NativeIntegrationDestinationService implements IntegrationD
events: RouterTransformationRequestData[],
destinationType: string,
version: string,
_requestMetadata: NonNullable<unknown>,
requestMetadata: NonNullable<unknown>,
): Promise<RouterTransformationResponse[]> {
const destHandler = FetchHandler.getDestHandler(destinationType, version);
const allDestEvents: NonNullable<unknown> = groupBy(
Expand All @@ -106,7 +106,7 @@ export default class NativeIntegrationDestinationService implements IntegrationD
);
try {
const doRouterTransformationResponse: RouterTransformationResponse[] =
await destHandler.processRouterDest(cloneDeep(destInputArray));
await destHandler.processRouterDest(cloneDeep(destInputArray), requestMetadata);
metaTO.metadata = destInputArray[0].metadata;
return DestinationPostTransformationService.handleRouterTransformSuccessEvents(
doRouterTransformationResponse,
Expand All @@ -132,7 +132,7 @@ export default class NativeIntegrationDestinationService implements IntegrationD
events: RouterTransformationRequestData[],
destinationType: string,
version: any,
_requestMetadata: NonNullable<unknown>,
requestMetadata: NonNullable<unknown>,
): RouterTransformationResponse[] {
const destHandler = FetchHandler.getDestHandler(destinationType, version);
if (!destHandler.batch) {
Expand All @@ -145,7 +145,10 @@ export default class NativeIntegrationDestinationService implements IntegrationD
const groupedEvents: RouterTransformationRequestData[][] = Object.values(allDestEvents);
const response = groupedEvents.map((destEvents) => {
try {
const destBatchedRequests: RouterTransformationResponse[] = destHandler.batch(destEvents);
const destBatchedRequests: RouterTransformationResponse[] = destHandler.batch(
destEvents,
requestMetadata,
);
return destBatchedRequests;
} catch (error: any) {
const metaTO = this.getTags(
Expand Down
1 change: 1 addition & 0 deletions src/services/misc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export default class MiscService {
return {
namespace: 'Unknown',
cluster: 'Unknown',
features: ctx.state?.features || {},
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/util/errorNotifier/bugsnag.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
UnhandledStatusCodeError,
UnauthorizedError,
NetworkInstrumentationError,
FilteredEventsError,
} = require('../../v0/util/errorTypes');

const {
Expand All @@ -48,6 +49,7 @@ const errorTypesDenyList = [
NetworkInstrumentationError,
CDKCustomError,
DataValidationError,
FilteredEventsError,
];

const pathsDenyList = [
Expand Down
22 changes: 19 additions & 3 deletions src/v0/destinations/braze/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ const {
isHttpStatusSuccess,
simpleProcessRouterDestSync,
simpleProcessRouterDest,
isNewStatusCodesAccepted,
} = require('../../util');
const { InstrumentationError, NetworkError } = require('../../util/errorTypes');
const {
InstrumentationError,
NetworkError,
FilteredEventsError,
} = require('../../util/errorTypes');
const {
ConfigCategory,
mappingConfig,
Expand Down Expand Up @@ -223,7 +228,13 @@ async function processIdentify(message, destination) {
}
}

function processTrackWithUserAttributes(message, destination, mappingJson, processParams) {
function processTrackWithUserAttributes(
message,
destination,
mappingJson,
processParams,
reqMetadata,
) {
let payload = getUserAttributesObject(message, mappingJson);
if (payload && Object.keys(payload).length > 0) {
payload = setExternalIdOrAliasObject(payload, message);
Expand All @@ -236,6 +247,10 @@ function processTrackWithUserAttributes(message, destination, mappingJson, proce
);
if (dedupedAttributePayload) {
requestJson.attributes = [dedupedAttributePayload];
} else if (isNewStatusCodesAccepted(reqMetadata)) {
throw new FilteredEventsError(
'[Braze Deduplication]: Duplicate user detected, the user is dropped',
);
} else {
throw new InstrumentationError(
'[Braze Deduplication]: Duplicate user detected, the user is dropped',
Expand Down Expand Up @@ -444,7 +459,7 @@ function processAlias(message, destination) {
);
}

async function process(event, processParams = { userStore: new Map() }) {
async function process(event, processParams = { userStore: new Map() }, reqMetadata = {}) {
let response;
const { message, destination } = event;
const messageType = message.type.toLowerCase();
Expand Down Expand Up @@ -490,6 +505,7 @@ async function process(event, processParams = { userStore: new Map() }) {
destination,
mappingConfig[category.name],
processParams,
reqMetadata,
);
break;
case EventType.GROUP:
Expand Down
9 changes: 8 additions & 1 deletion src/v0/destinations/braze/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const {
ALIAS_BRAZE_MAX_REQ_COUNT,
TRACK_BRAZE_MAX_REQ_COUNT,
} = require('./config');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { JSON_MIME_TYPE, HTTP_STATUS_CODES } = require('../../util/constant');
const { isObject } = require('../../util');
const { removeUndefinedValues, getIntegrationsObj } = require('../../util');
const { InstrumentationError } = require('../../util/errorTypes');
Expand Down Expand Up @@ -363,11 +363,14 @@ const processBatch = (transformedEvents) => {
const purchaseArray = [];
const successMetadata = [];
const failureResponses = [];
const filteredResponses = [];
const subscriptionsArray = [];
const mergeUsersArray = [];
for (const transformedEvent of transformedEvents) {
if (!isHttpStatusSuccess(transformedEvent?.statusCode)) {
failureResponses.push(transformedEvent);
} else if (transformedEvent?.statusCode === HTTP_STATUS_CODES.FILTER_EVENTS) {
filteredResponses.push(transformedEvent);
} else if (transformedEvent?.batchedRequest?.body?.JSON) {
const { attributes, events, purchases, subscription_groups, merge_updates } =
transformedEvent.batchedRequest.body.JSON;
Expand Down Expand Up @@ -446,6 +449,10 @@ const processBatch = (transformedEvents) => {
finalResponse.push(...failureResponses);
}

if (filteredResponses.length > 0) {
finalResponse.push(...filteredResponses);
}

return finalResponse;
};

Expand Down
80 changes: 61 additions & 19 deletions src/v0/destinations/klaviyo/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ const {
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
flattenJson,
isNewStatusCodesAccepted,
} = require('../../util');

const { ConfigurationError, InstrumentationError } = require('../../util/errorTypes');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { JSON_MIME_TYPE, HTTP_STATUS_CODES } = require('../../util/constant');

/**
* Main Identify request handler func
Expand All @@ -49,9 +49,10 @@ const { JSON_MIME_TYPE } = require('../../util/constant');
* @param {*} message
* @param {*} category
* @param {*} destination
* @param {*} reqMetadata
* @returns
*/
const identifyRequestHandler = async (message, category, destination) => {
const identifyRequestHandler = async (message, category, destination, reqMetadata) => {
// If listId property is present try to subscribe/member user in list
const { privateApiKey, enforceEmailAsPrimary, listId, flattenProperties } = destination.Config;
const mappedToDestination = get(message, MappedToDestinationKey);
Expand Down Expand Up @@ -105,17 +106,40 @@ const identifyRequestHandler = async (message, category, destination) => {
},
};

const profileId = await getIdFromNewOrExistingProfile(endpoint, payload, requestOptions);
const { profileId, response, statusCode } = await getIdFromNewOrExistingProfile(
endpoint,
payload,
requestOptions,
);

// Update Profile
const responseArray = [profileUpdateResponseBuilder(payload, profileId, category, privateApiKey)];
const responseMap = {
profileUpdateResponse: profileUpdateResponseBuilder(
payload,
profileId,
category,
privateApiKey,
),
};

// check if user wants to subscribe profile or not and listId is present or not
if (traitsInfo?.properties?.subscribe && (traitsInfo.properties?.listId || listId)) {
responseArray.push(subscribeUserToList(message, traitsInfo, destination));
return responseArray;
responseMap.subscribeUserToListResponse = subscribeUserToList(message, traitsInfo, destination);
}

if (isNewStatusCodesAccepted(reqMetadata) && statusCode === HTTP_STATUS_CODES.CREATED) {
responseMap.suppressEventResponse = {
...responseMap.profileUpdateResponse,
statusCode: HTTP_STATUS_CODES.SUPPRESS_EVENTS,
error: JSON.stringify(response),
};
return responseMap.subscribeUserToListResponse
? [responseMap.subscribeUserToListResponse]
: responseMap.suppressEventResponse;
}
return responseArray[0];

return responseMap.subscribeUserToListResponse
? [responseMap.profileUpdateResponse, responseMap.subscribeUserToListResponse]
: responseMap.profileUpdateResponse;
};

// ----------------------
Expand Down Expand Up @@ -241,7 +265,7 @@ const groupRequestHandler = (message, category, destination) => {
};

// Main event processor using specific handler funcs
const processEvent = async (message, destination) => {
const processEvent = async (message, destination, reqMetadata) => {
if (!message.type) {
throw new InstrumentationError('Event type is required');
}
Expand All @@ -255,7 +279,7 @@ const processEvent = async (message, destination) => {
switch (messageType) {
case EventType.IDENTIFY:
category = CONFIG_CATEGORIES.IDENTIFY;
response = await identifyRequestHandler(message, category, destination);
response = await identifyRequestHandler(message, category, destination, reqMetadata);
break;
case EventType.SCREEN:
case EventType.TRACK:
Expand All @@ -272,8 +296,8 @@ const processEvent = async (message, destination) => {
return response;
};

const process = async (event) => {
const result = await processEvent(event.message, event.destination);
const process = async (event, reqMetadata) => {
const result = await processEvent(event.message, event.destination, reqMetadata);
return result;
};

Expand Down Expand Up @@ -312,7 +336,7 @@ const processRouterDest = async (inputs, reqMetadata) => {
// if not transformed
getEventChunks(
{
message: await process(event),
message: await process(event, reqMetadata),
metadata: event.metadata,
destination,
},
Expand All @@ -326,13 +350,31 @@ const processRouterDest = async (inputs, reqMetadata) => {
}
}),
);
let batchedSubscribeResponseList = [];
const batchedSubscribeResponseList = [];
if (subscribeRespList.length > 0) {
batchedSubscribeResponseList = batchSubscribeEvents(subscribeRespList);
const batchedResponseList = batchSubscribeEvents(subscribeRespList);
batchedSubscribeResponseList.push(...batchedResponseList);
}
const nonSubscribeSuccessList = nonSubscribeRespList.map((resp) =>
getSuccessRespEvents(resp.message, [resp.metadata], resp.destination),
);
const nonSubscribeSuccessList = nonSubscribeRespList.map((resp) => {
const response = resp;
const { message, metadata, destination: eventDestination } = response;
if (
isNewStatusCodesAccepted(reqMetadata) &&
message?.statusCode &&
message.statusCode === HTTP_STATUS_CODES.SUPPRESS_EVENTS
) {
delete message.statusCode;
return getSuccessRespEvents(
message,
[metadata],
eventDestination,
false,
HTTP_STATUS_CODES.SUPPRESS_EVENTS,
);
}
return getSuccessRespEvents(message, [metadata], eventDestination);
});

batchResponseList = [...batchedSubscribeResponseList, ...nonSubscribeSuccessList];

return [...batchResponseList, ...batchErrorRespList];
Expand Down
Loading

0 comments on commit 6bdb01e

Please sign in to comment.