Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: return 298 for filtered/dropped events in user transformer #2665

Merged
merged 4 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/constants/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ const MappedToDestinationKey = 'context.mappedToDestination';
const GENERIC_TRUE_VALUES = ['true', 'True', 'TRUE', 't', 'T', '1'];
const GENERIC_FALSE_VALUES = ['false', 'False', 'FALSE', 'f', 'F', '0'];

const HTTP_CUSTOM_STATUS_CODES = {
FILTERED: 298,
};

module.exports = {
EventType,
GENERIC_TRUE_VALUES,
Expand All @@ -58,4 +62,5 @@ module.exports = {
SpecedTraits,
TraitsMapping,
WhiteListedTraits,
HTTP_CUSTOM_STATUS_CODES,
};
2 changes: 1 addition & 1 deletion src/controllers/userTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
);
const events = ctx.request.body as ProcessorTransformationRequest[];
const processedRespone: UserTransformationServiceResponse =
await UserTransformService.transformRoutine(events);
await UserTransformService.transformRoutine(events, ctx.state.features);

Check warning on line 20 in src/controllers/userTransform.ts

View check run for this annotation

Codecov / codecov/patch

src/controllers/userTransform.ts#L20

Added line #L20 was not covered by tests
ctx.body = processedRespone.transformedEvents;
ControllerUtility.postProcess(ctx, processedRespone.retryStatus);
logger.debug(
Expand Down
49 changes: 49 additions & 0 deletions src/middlewares/featureFlag.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { Context, Next } from 'koa';

export interface FeatureFlags {
[key: string]: boolean | string;
}

export const FEATURE_FILTER_CODE = 'filter-code';

export default class FeatureFlagMiddleware {
public static async handle(ctx: Context, next: Next): Promise<void> {

Check warning on line 10 in src/middlewares/featureFlag.ts

View check run for this annotation

Codecov / codecov/patch

src/middlewares/featureFlag.ts#L10

Added line #L10 was not covered by tests
// Initialize ctx.state.features if it doesn't exist
ctx.state.features = (ctx.state.features || {}) as FeatureFlags;

// Get headers from the request
const { headers } = ctx.request;

Check warning on line 15 in src/middlewares/featureFlag.ts

View check run for this annotation

Codecov / codecov/patch

src/middlewares/featureFlag.ts#L15

Added line #L15 was not covered by tests

// Filter headers that start with 'X-Feature-'
const featureHeaders = Object.keys(headers).filter((key) =>
key.toLowerCase().startsWith('x-feature-'),

Check warning on line 19 in src/middlewares/featureFlag.ts

View check run for this annotation

Codecov / codecov/patch

src/middlewares/featureFlag.ts#L18-L19

Added lines #L18 - L19 were not covered by tests
);

// Convert feature headers to feature flags in ctx.state.features
featureHeaders.forEach((featureHeader) => {

Check warning on line 23 in src/middlewares/featureFlag.ts

View check run for this annotation

Codecov / codecov/patch

src/middlewares/featureFlag.ts#L23

Added line #L23 was not covered by tests
// Get the feature name by removing the prefix, and convert to camelCase
const featureName = featureHeader

Check warning on line 25 in src/middlewares/featureFlag.ts

View check run for this annotation

Codecov / codecov/patch

src/middlewares/featureFlag.ts#L25

Added line #L25 was not covered by tests
.substring(10)
.replace(/X-Feature-/g, '')
.toLowerCase();

let value: string | boolean | undefined;
const valueString = headers[featureHeader] as string;

Check warning on line 31 in src/middlewares/featureFlag.ts

View check run for this annotation

Codecov / codecov/patch

src/middlewares/featureFlag.ts#L31

Added line #L31 was not covered by tests
if (valueString === 'true' || valueString === '?1') {
value = true;

Check warning on line 33 in src/middlewares/featureFlag.ts

View check run for this annotation

Codecov / codecov/patch

src/middlewares/featureFlag.ts#L33

Added line #L33 was not covered by tests
} else if (valueString === 'false' || valueString === '?0') {
value = false;
} else {
value = valueString;

Check warning on line 37 in src/middlewares/featureFlag.ts

View check run for this annotation

Codecov / codecov/patch

src/middlewares/featureFlag.ts#L35-L37

Added lines #L35 - L37 were not covered by tests
}

// Set the feature flag in ctx.state.features
if (value !== undefined) {
ctx.state.features[featureName] = value;

Check warning on line 42 in src/middlewares/featureFlag.ts

View check run for this annotation

Codecov / codecov/patch

src/middlewares/featureFlag.ts#L42

Added line #L42 was not covered by tests
}
});

// Move to the next middleware
await next();

Check warning on line 47 in src/middlewares/featureFlag.ts

View check run for this annotation

Codecov / codecov/patch

src/middlewares/featureFlag.ts#L47

Added line #L47 was not covered by tests
}
}
4 changes: 3 additions & 1 deletion src/routes/userTransform.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import Router from '@koa/router';
import RouteActivationController from '../middlewares/routeActivation';
import FeatureFlagController from '../middlewares/featureFlag';
import UserTransformController from '../controllers/userTransform';

const router = new Router();

router.post(
'/customTransform',
RouteActivationController.isUserTransformRouteActive,
FeatureFlagController.handle,
UserTransformController.transform,
);
router.post(
Expand All @@ -31,4 +33,4 @@ router.post(
);

const userTransformRoutes = router.routes();
export default userTransformRoutes;
export default userTransformRoutes;
81 changes: 55 additions & 26 deletions src/services/userTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
import { SUPPORTED_FUNC_NAMES } from '../util/ivmFactory';
import logger from '../logger';
import stats from '../util/stats';
import { CommonUtils } from '../util/common';
// eslint-disable-next-line @typescript-eslint/no-unused-vars
import { CatchErr, FixMe } from '../util/types';
import { FeatureFlags, FEATURE_FILTER_CODE } from '../middlewares/featureFlag';
import { HTTP_CUSTOM_STATUS_CODES } from '../constants';

export default class UserTransformService {
public static async transformRoutine(
events: ProcessorTransformationRequest[],
features: FeatureFlags = {},
): Promise<UserTransformationServiceResponse> {
let retryStatus = 200;
const groupedEvents: NonNullable<unknown> = groupBy(
Expand All @@ -41,12 +45,13 @@
);
}
const responses = await Promise.all<FixMe>(
Object.entries(groupedEvents).map(async ([dest, destEvents]) => {
logger.debug(`dest: ${dest}`);
Object.entries(groupedEvents).map(async ([, destEvents]) => {
const eventsToProcess = destEvents as ProcessorTransformationRequest[];
const transformationVersionId =
eventsToProcess[0]?.destination?.Transformations[0]?.VersionID;
const messageIds = eventsToProcess.map((ev) => ev.metadata?.messageId);
const messageIdsSet = new Set<string>(messageIds);
const messageIdsInOutputSet = new Set<string>();

const commonMetadata = {
sourceId: eventsToProcess[0]?.metadata?.sourceId,
Expand Down Expand Up @@ -80,31 +85,55 @@
transformationVersionId,
librariesVersionIDs,
);
transformedEvents.push(
...destTransformedEvents.map((ev) => {
if (ev.error) {
return {
statusCode: 400,
error: ev.error,
metadata: isEmpty(ev.metadata) ? commonMetadata : ev.metadata,
} as unknown as ProcessorTransformationResponse;
}
if (!isNonFuncObject(ev.transformedEvent)) {
return {
statusCode: 400,
error: `returned event in events from user transformation is not an object. transformationVersionId:${transformationVersionId} and returned event: ${JSON.stringify(
ev.transformedEvent,
)}`,
metadata: isEmpty(ev.metadata) ? commonMetadata : ev.metadata,
} as ProcessorTransformationResponse;
}
return {
output: ev.transformedEvent,

const transformedEventsWithMetadata: ProcessorTransformationResponse[] = [];
destTransformedEvents.forEach((ev) => {
if (ev.error) {
transformedEventsWithMetadata.push({

Check warning on line 92 in src/services/userTransform.ts

View check run for this annotation

Codecov / codecov/patch

src/services/userTransform.ts#L92

Added line #L92 was not covered by tests
statusCode: 400,
error: ev.error,
metadata: isEmpty(ev.metadata) ? commonMetadata : ev.metadata,
statusCode: 200,
} as ProcessorTransformationResponse;
}),
);
} as unknown as ProcessorTransformationResponse);
return;

Check warning on line 97 in src/services/userTransform.ts

View check run for this annotation

Codecov / codecov/patch

src/services/userTransform.ts#L97

Added line #L97 was not covered by tests
}
if (!isNonFuncObject(ev.transformedEvent)) {
transformedEventsWithMetadata.push({

Check warning on line 100 in src/services/userTransform.ts

View check run for this annotation

Codecov / codecov/patch

src/services/userTransform.ts#L100

Added line #L100 was not covered by tests
statusCode: 400,
error: `returned event in events from user transformation is not an object. transformationVersionId:${transformationVersionId} and returned event: ${JSON.stringify(
ev.transformedEvent,
)}`,
metadata: isEmpty(ev.metadata) ? commonMetadata : ev.metadata,
} as ProcessorTransformationResponse);
return;

Check warning on line 107 in src/services/userTransform.ts

View check run for this annotation

Codecov / codecov/patch

src/services/userTransform.ts#L107

Added line #L107 was not covered by tests
}
// add messageId to output set
if (ev.metadata?.messageId) {
messageIdsInOutputSet.add(ev.metadata.messageId);
} else if (ev.metadata?.messageIds) {
ev.metadata.messageIds.forEach((id) => messageIdsInOutputSet.add(id));

Check warning on line 113 in src/services/userTransform.ts

View check run for this annotation

Codecov / codecov/patch

src/services/userTransform.ts#L113

Added line #L113 was not covered by tests
}
transformedEventsWithMetadata.push({
output: ev.transformedEvent,
metadata: isEmpty(ev.metadata) ? commonMetadata : ev.metadata,
statusCode: 200,
} as ProcessorTransformationResponse);
});

if (features[FEATURE_FILTER_CODE]) {
// find difference between input and output messageIds
const messageIdsNotInOutput = CommonUtils.setDiff(messageIdsSet, messageIdsInOutputSet);
const droppedEvents = messageIdsNotInOutput.map((id) => ({
statusCode: HTTP_CUSTOM_STATUS_CODES.FILTERED,
metadata: {
...commonMetadata,
messageId: id,
messageIds: null,
},
}));
transformedEvents.push(...droppedEvents);
}

transformedEvents.push(...transformedEventsWithMetadata);
} catch (error: CatchErr) {
logger.error(error);
let status = 400;
Expand Down
4 changes: 4 additions & 0 deletions src/util/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const CommonUtils = {
}
return [obj];
},

setDiff(mainSet, comparisionSet) {
return [...mainSet].filter((item) => !comparisionSet.has(item));
},
};

module.exports = {
Expand Down
Loading
Loading