Skip to content

Commit 0b6bbbb

Browse files
sdangoldreamorosi
andauthored
feat(parser): integrate parser with Batch Processing (#4408)
Co-authored-by: Andrea Amorosi <dreamorosi@gmail.com>
1 parent e21f0cf commit 0b6bbbb

File tree

9 files changed

+586
-20
lines changed

9 files changed

+586
-20
lines changed

package-lock.json

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
"packages/tracer",
1111
"packages/parameters",
1212
"packages/idempotency",
13+
"packages/parser",
1314
"packages/batch",
1415
"packages/testing",
15-
"packages/parser",
1616
"examples/snippets",
1717
"layers",
1818
"examples/app",

packages/batch/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
"nodejs"
7474
],
7575
"devDependencies": {
76-
"@aws-lambda-powertools/testing-utils": "file:../testing"
76+
"@aws-lambda-powertools/testing-utils": "file:../testing",
77+
"@aws-lambda-powertools/parser": "2.25.2"
7778
}
7879
}

packages/batch/src/BasePartialBatchProcessor.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { StandardSchemaV1 } from '@standard-schema/spec';
12
import type {
23
DynamoDBRecord,
34
KinesisStreamRecord,
@@ -11,6 +12,7 @@ import {
1112
} from './constants.js';
1213
import { FullBatchFailureError } from './errors.js';
1314
import type {
15+
BasePartialBatchProcessorConfig,
1416
EventSourceDataClassTypes,
1517
PartialItemFailureResponse,
1618
PartialItemFailures,
@@ -42,12 +44,20 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
4244
*/
4345
public eventType: keyof typeof EventType;
4446

47+
/**
48+
* The schema of the body of the event record for parsing
49+
*/
50+
protected schema?: StandardSchemaV1;
51+
4552
/**
4653
* Initializes base batch processing class
4754
*
4855
* @param eventType The type of event to process (SQS, Kinesis, DynamoDB)
4956
*/
50-
public constructor(eventType: keyof typeof EventType) {
57+
public constructor(
58+
eventType: keyof typeof EventType,
59+
config?: BasePartialBatchProcessorConfig
60+
) {
5161
super();
5262
this.eventType = eventType;
5363
this.batchResponse = DEFAULT_RESPONSE;
@@ -56,6 +66,9 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
5666
[EventType.KinesisDataStreams]: () => this.collectKinesisFailures(),
5767
[EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(),
5868
};
69+
if (config) {
70+
this.schema = config.schema;
71+
}
5972
}
6073

6174
/**

packages/batch/src/BasePartialProcessor.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ abstract class BasePartialProcessor {
7474
* This method should be called when a record fails processing so that
7575
* the processor can keep track of the error and the record that failed.
7676
*
77-
* @param record Record that failed processing
78-
* @param error Error that was thrown
77+
* @param record - Record that failed processing
78+
* @param error - Error that was thrown
7979
*/
8080
public failureHandler(
8181
record: EventSourceDataClassTypes,
@@ -131,7 +131,7 @@ abstract class BasePartialProcessor {
131131
* This is to ensure that the processor keeps track of the results and the records
132132
* that succeeded and failed processing.
133133
*
134-
* @param record Record to be processed
134+
* @param record - Record to be processed
135135
*/
136136
public abstract processRecord(
137137
record: BaseRecord
@@ -149,7 +149,7 @@ abstract class BasePartialProcessor {
149149
* This is to ensure that the processor keeps track of the results and the records
150150
* that succeeded and failed processing.
151151
*
152-
* @param record Record to be processed
152+
* @param record - Record to be processed
153153
*/
154154
public abstract processRecordSync(
155155
record: BaseRecord
@@ -198,9 +198,9 @@ abstract class BasePartialProcessor {
198198
* to allow for reusing the processor instance across multiple invocations
199199
* by instantiating the processor outside of the Lambda function handler.
200200
*
201-
* @param records Array of records to be processed
202-
* @param handler CallableFunction to process each record from the batch
203-
* @param options Options to be used during processing (optional)
201+
* @param records - Array of records to be processed
202+
* @param handler - CallableFunction to process each record from the batch
203+
* @param options - Options to be used during processing (optional)
204204
*/
205205
public register(
206206
records: BaseRecord[],
@@ -223,8 +223,8 @@ abstract class BasePartialProcessor {
223223
* This method should be called when a record succeeds processing so that
224224
* the processor can keep track of the result and the record that succeeded.
225225
*
226-
* @param record Record that succeeded processing
227-
* @param result Result from record handler
226+
* @param record - Record that succeeded processing
227+
* @param result - Result from record handler
228228
*/
229229
public successHandler(
230230
record: EventSourceDataClassTypes,

packages/batch/src/BatchProcessor.ts

Lines changed: 174 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
1+
import type { StandardSchemaV1 } from '@standard-schema/spec';
2+
import type { StreamRecord } from 'aws-lambda';
13
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
4+
import { EventType, SchemaVendor } from './constants.js';
25
import { BatchProcessingError } from './errors.js';
3-
import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
6+
import type {
7+
BaseRecord,
8+
EventSourceDataClassTypes,
9+
FailureResponse,
10+
SuccessResponse,
11+
} from './types.js';
412

513
/**
614
* Process records in a batch asynchronously and handle partial failure cases.
@@ -79,7 +87,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
7987
* });
8088
* ```
8189
*
82-
* @param eventType The type of event to process (SQS, Kinesis, DynamoDB)
90+
* @param eventType - The type of event to process (SQS, Kinesis, DynamoDB)
8391
*/
8492
class BatchProcessor extends BasePartialBatchProcessor {
8593
/**
@@ -94,13 +102,17 @@ class BatchProcessor extends BasePartialBatchProcessor {
94102
* If the handler function completes successfully, the method returns a success response.
95103
* Otherwise, it returns a failure response with the error that occurred during processing.
96104
*
97-
* @param record The record to be processed
105+
* @param record - The record to be processed
98106
*/
99107
public async processRecord(
100108
record: BaseRecord
101109
): Promise<SuccessResponse | FailureResponse> {
102110
try {
103-
const data = this.toBatchType(record, this.eventType);
111+
const recordToProcess =
112+
this.schema == null
113+
? record
114+
: await this.#parseRecord(record, this.eventType, this.schema);
115+
const data = this.toBatchType(recordToProcess, this.eventType);
104116
const result = await this.handler(data, this.options?.context);
105117

106118
return this.successHandler(record, result);
@@ -112,7 +124,7 @@ class BatchProcessor extends BasePartialBatchProcessor {
112124
/**
113125
* @throws {BatchProcessingError} This method is not implemented for synchronous processing.
114126
*
115-
* @param _record The record to be processed
127+
* @param _record - The record to be processed
116128
*/
117129
public processRecordSync(
118130
_record: BaseRecord
@@ -121,6 +133,163 @@ class BatchProcessor extends BasePartialBatchProcessor {
121133
'Not implemented. Use asyncProcess() instead.'
122134
);
123135
}
136+
137+
/**
138+
* Extend the schema according to the event type passed.
139+
*
140+
* If useTransformers is true, extend using opinionated transformers.
141+
* Otherwise, extend without any transformers.
142+
*
143+
* @param eventType - The type of event to process (SQS, Kinesis, DynamoDB)
144+
* @param schema - The StandardSchema to be used for parsing
145+
* @param useTransformers - Whether to use transformers for parsing
146+
*/
147+
async #createExtendedSchema(options: {
148+
eventType: keyof typeof EventType;
149+
schema: StandardSchemaV1;
150+
useTransformers: boolean;
151+
}) {
152+
const { eventType, schema, useTransformers } = options;
153+
switch (eventType) {
154+
case EventType.SQS: {
155+
if (useTransformers) {
156+
const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([
157+
import('@aws-lambda-powertools/parser/helpers'),
158+
import('@aws-lambda-powertools/parser/schemas/sqs'),
159+
]);
160+
return SqsRecordSchema.extend({
161+
body: JSONStringified(schema as any),
162+
});
163+
}
164+
const { SqsRecordSchema } = await import(
165+
'@aws-lambda-powertools/parser/schemas/sqs'
166+
);
167+
return SqsRecordSchema.extend({ body: schema });
168+
}
169+
170+
case EventType.KinesisDataStreams: {
171+
if (useTransformers) {
172+
const [
173+
{ Base64Encoded },
174+
{ KinesisDataStreamRecord, KinesisDataStreamRecordPayload },
175+
] = await Promise.all([
176+
import('@aws-lambda-powertools/parser/helpers'),
177+
import('@aws-lambda-powertools/parser/schemas/kinesis'),
178+
]);
179+
return KinesisDataStreamRecord.extend({
180+
kinesis: KinesisDataStreamRecordPayload.extend({
181+
data: Base64Encoded(schema as any),
182+
}),
183+
});
184+
}
185+
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
186+
await import('@aws-lambda-powertools/parser/schemas/kinesis');
187+
return KinesisDataStreamRecord.extend({
188+
kinesis: KinesisDataStreamRecordPayload.extend({ data: schema }),
189+
});
190+
}
191+
192+
case EventType.DynamoDBStreams: {
193+
if (useTransformers) {
194+
const [
195+
{ DynamoDBMarshalled },
196+
{ DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase },
197+
] = await Promise.all([
198+
import('@aws-lambda-powertools/parser/helpers/dynamodb'),
199+
import('@aws-lambda-powertools/parser/schemas/dynamodb'),
200+
]);
201+
return DynamoDBStreamRecord.extend({
202+
dynamodb: DynamoDBStreamChangeRecordBase.extend({
203+
OldImage: DynamoDBMarshalled<StreamRecord['OldImage']>(
204+
schema as any
205+
).optional(),
206+
NewImage: DynamoDBMarshalled<StreamRecord['NewImage']>(
207+
schema as any
208+
).optional(),
209+
}),
210+
});
211+
}
212+
const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } =
213+
await import('@aws-lambda-powertools/parser/schemas/dynamodb');
214+
return DynamoDBStreamRecord.extend({
215+
dynamodb: DynamoDBStreamChangeRecordBase.extend({
216+
OldImage: (schema as any).optional(),
217+
NewImage: (schema as any).optional(),
218+
}),
219+
});
220+
}
221+
222+
default: {
223+
console.warn(
224+
`The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}`
225+
);
226+
throw new Error('Unsupported event type');
227+
}
228+
}
229+
}
230+
231+
/**
232+
* Parse the record according to the schema and event type passed.
233+
*
234+
* If the passed schema is already an extended schema,
235+
* use the schema directly to parse the record.
236+
*
237+
* Only Zod Schemas are supported for schema extension.
238+
*
239+
* @param record - The record to be parsed
240+
* @param eventType - The type of event to process
241+
* @param schema - The StandardSchema to be used for parsing
242+
*/
243+
async #parseRecord(
244+
record: EventSourceDataClassTypes,
245+
eventType: keyof typeof EventType,
246+
schema: StandardSchemaV1
247+
): Promise<EventSourceDataClassTypes> {
248+
const { parse } = await import('@aws-lambda-powertools/parser');
249+
// Try parsing with the original schema first
250+
const extendedSchemaParsing = parse(record, undefined, schema, true);
251+
if (extendedSchemaParsing.success) {
252+
return extendedSchemaParsing.data as EventSourceDataClassTypes;
253+
}
254+
// Only proceed with schema extension if it's a Zod schema
255+
if (schema['~standard'].vendor !== SchemaVendor.Zod) {
256+
console.warn(
257+
'The schema provided is not supported. Only Zod schemas are supported for extension.'
258+
);
259+
throw new Error('Unsupported schema type');
260+
}
261+
// Handle schema extension based on event type
262+
// Try without transformers first, then with transformers
263+
const schemaWithoutTransformers = await this.#createExtendedSchema({
264+
eventType,
265+
schema,
266+
useTransformers: false,
267+
});
268+
const schemaWithoutTransformersParsing = parse(
269+
record,
270+
undefined,
271+
schemaWithoutTransformers,
272+
true
273+
);
274+
if (schemaWithoutTransformersParsing.success) {
275+
return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes;
276+
}
277+
const schemaWithTransformers = await this.#createExtendedSchema({
278+
eventType,
279+
schema,
280+
useTransformers: true,
281+
});
282+
const schemaWithTransformersParsing = parse(
283+
record,
284+
undefined,
285+
schemaWithTransformers,
286+
true
287+
);
288+
if (schemaWithTransformersParsing.success) {
289+
return schemaWithTransformersParsing.data as EventSourceDataClassTypes;
290+
}
291+
throw new Error('Failed to parse record');
292+
}
124293
}
125294

126295
export { BatchProcessor };

packages/batch/src/constants.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ const EventType = {
1717
DynamoDBStreams: 'DynamoDBStreams',
1818
} as const;
1919

20+
/**
21+
* Enum of supported schema vendors for the utility
22+
*/
23+
const SchemaVendor = {
24+
Zod: 'zod',
25+
} as const;
26+
2027
/**
2128
* Default response for the partial batch processor
2229
*/
@@ -35,4 +42,4 @@ const DATA_CLASS_MAPPING = {
3542
record as DynamoDBRecord,
3643
};
3744

38-
export { EventType, DEFAULT_RESPONSE, DATA_CLASS_MAPPING };
45+
export { EventType, SchemaVendor, DEFAULT_RESPONSE, DATA_CLASS_MAPPING };

0 commit comments

Comments
 (0)