Skip to content

Commit

Permalink
feat(batch): Async Processing of Records for for SQS Fifo (#3160)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexander Schueren <sha@amazon.com>
Co-authored-by: Alexander Schueren <am29d@hey.com>
  • Loading branch information
3 people authored Nov 8, 2024
1 parent 3789076 commit e73b575
Show file tree
Hide file tree
Showing 9 changed files with 478 additions and 181 deletions.
12 changes: 9 additions & 3 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,21 +149,27 @@ Enable the `skipGroupOnError` option for seamless processing of messages from va

=== "Recommended"

```typescript hl_lines="1-4 8"
```typescript hl_lines="1-4 8 20"
--8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts"
```

1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics)

=== "Async processing"

```typescript hl_lines="1-4 8 20"
--8<-- "examples/snippets/batch/gettingStartedSQSFifoAsync.ts"
```

=== "Enabling skipGroupOnError flag"

```typescript hl_lines="1-4 13 30"
--8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts"
```

!!! Note
Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`.
This is because we need to preserve the order of messages in the queue. See [Async or sync processing section](#async-or-sync-processing) for more details.
Note that `SqsFifoPartialProcessor` is synchronous using `processPartialResponseSync`.
If you need asynchronous processing while preserving the order of messages in the queue, use `SqsFifoPartialProcessorAsync` with `processPartialResponse`.

### Processing messages from Kinesis

Expand Down
22 changes: 22 additions & 0 deletions examples/snippets/batch/gettingStartedSQSFifoAsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import {
SqsFifoPartialProcessorAsync,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { SQSHandler, SQSRecord } from 'aws-lambda';

const processor = new SqsFifoPartialProcessorAsync();
const logger = new Logger();

const recordHandler = async (record: SQSRecord): Promise<void> => {
const payload = record.body;
if (payload) {
const item = JSON.parse(payload);
logger.info('Processed item', { item });
}
};

export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
});
54 changes: 12 additions & 42 deletions packages/batch/src/SqsFifoPartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { SQSRecord } from 'aws-lambda';
import { BatchProcessorSync } from './BatchProcessorSync.js';
import { SqsFifoProcessor } from './SqsFifoProcessor.js';
import { EventType } from './constants.js';
import {
type BatchProcessingError,
Expand Down Expand Up @@ -46,17 +47,13 @@ import type {
*/
class SqsFifoPartialProcessor extends BatchProcessorSync {
/**
* The ID of the current message group being processed.
* Processor for handling SQS FIFO message
*/
#currentGroupId?: string;
/**
* A set of group IDs that have already encountered failures.
*/
#failedGroupIds: Set<string>;
readonly #processor: SqsFifoProcessor;

public constructor() {
super(EventType.SQS);
this.#failedGroupIds = new Set<string>();
this.#processor = new SqsFifoProcessor();
}

/**
Expand All @@ -70,9 +67,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
record: EventSourceDataClassTypes,
exception: Error
): FailureResponse {
if (this.options?.skipGroupOnError && this.#currentGroupId) {
this.#addToFailedGroup(this.#currentGroupId);
}
this.#processor.processFailureForCurrentGroup(this.options);

return super.failureHandler(record, exception);
}
Expand Down Expand Up @@ -101,24 +96,17 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
let currentIndex = 0;
for (const record of this.records) {
this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId);
this.#processor.setCurrentGroup(
(record as SQSRecord).attributes?.MessageGroupId
);

// If we have any failed messages, we should then short circuit the process and
// fail remaining messages unless `skipGroupOnError` is true
const shouldShortCircuit =
!this.options?.skipGroupOnError && this.failureMessages.length !== 0;
if (shouldShortCircuit) {
if (
this.#processor.shouldShortCircuit(this.failureMessages, this.options)
) {
return this.shortCircuitProcessing(currentIndex, processedRecords);
}

// If `skipGroupOnError` is true and the current group has previously failed,
// then we should skip processing the current group.
const shouldSkipCurrentGroup =
this.options?.skipGroupOnError &&
this.#currentGroupId &&
this.#failedGroupIds.has(this.#currentGroupId);

const result = shouldSkipCurrentGroup
const result = this.#processor.shouldSkipCurrentGroup(this.options)
? this.#processFailRecord(
record,
new SqsFifoMessageGroupShortCircuitError()
Expand Down Expand Up @@ -161,15 +149,6 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
return processedRecords;
}

/**
* Adds the specified group ID to the set of failed group IDs.
*
* @param group - The group ID to be added to the set of failed group IDs.
*/
#addToFailedGroup(group: string): void {
this.#failedGroupIds.add(group);
}

/**
* Processes a fail record.
*
Expand All @@ -184,15 +163,6 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {

return this.failureHandler(data, exception);
}

/**
* Sets the current group ID for the message being processed.
*
* @param group - The group ID of the current message being processed.
*/
#setCurrentGroup(group?: string): void {
this.#currentGroupId = group;
}
}

export { SqsFifoPartialProcessor };
167 changes: 167 additions & 0 deletions packages/batch/src/SqsFifoPartialProcessorAsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import type { SQSRecord } from 'aws-lambda';
import { BatchProcessor } from './BatchProcessor.js';
import { SqsFifoProcessor } from './SqsFifoProcessor.js';
import { EventType } from './constants.js';
import {
type BatchProcessingError,
SqsFifoMessageGroupShortCircuitError,
SqsFifoShortCircuitError,
} from './errors.js';
import type {
BaseRecord,
EventSourceDataClassTypes,
FailureResponse,
SuccessResponse,
} from './types.js';

/**
* Batch processor for SQS FIFO queues
*
* This class extends the {@link BatchProcessor} class and provides
* a mechanism to process records from SQS FIFO queues asynchronously.
*
* By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering.
*
* However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.
*
* @example
* ```typescript
* import {
* BatchProcessor,
* SqsFifoPartialProcessorAsync,
* processPartialResponse,
* } from '@aws-lambda-powertools/batch';
* import type { SQSRecord, SQSHandler } from 'aws-lambda';
*
* const processor = new SqsFifoPartialProcessorAsync();
*
* const recordHandler = async (record: SQSRecord): Promise<void> => {
* const payload = JSON.parse(record.body);
* };
*
* export const handler: SQSHandler = async (event, context) =>
* processPartialResponse(event, recordHandler, processor, {
* context,
* });
* ```
*/
class SqsFifoPartialProcessorAsync extends BatchProcessor {
/**
* Processor for handling SQS FIFO message
*/
readonly #processor: SqsFifoProcessor;

public constructor() {
super(EventType.SQS);
this.#processor = new SqsFifoProcessor();
}

/**
* Handles a failure for a given record.
*
* @param record - The record that failed.
* @param exception - The error that occurred.
*/
public failureHandler(
record: EventSourceDataClassTypes,
exception: Error
): FailureResponse {
this.#processor.processFailureForCurrentGroup(this.options);

return super.failureHandler(record, exception);
}

/**
* Process a record with a asynchronous handler
*
* This method orchestrates the processing of a batch of records asynchronously
* for SQS FIFO queues.
*
* The method calls the prepare hook to initialize the processor and then
* iterates over each record in the batch, processing them one by one.
*
* If one of them fails and `skipGroupOnError` is not true, the method short circuits
* the processing and fails the remaining records in the batch.
*
* If one of them fails and `skipGroupOnError` is true, then the method fails the current record
* if the message group has any previous failure, otherwise keeps processing.
*
* Then, it calls the clean hook to clean up the processor and returns the
* processed records.
*/
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
this.prepare();

const processedRecords: (SuccessResponse | FailureResponse)[] = [];
let currentIndex = 0;
for (const record of this.records) {
this.#processor.setCurrentGroup(
(record as SQSRecord).attributes?.MessageGroupId
);

if (
this.#processor.shouldShortCircuit(this.failureMessages, this.options)
) {
return this.shortCircuitProcessing(currentIndex, processedRecords);
}

const result = this.#processor.shouldSkipCurrentGroup(this.options)
? this.#processFailRecord(
record,
new SqsFifoMessageGroupShortCircuitError()
)
: await this.processRecord(record);

processedRecords.push(result);
currentIndex++;
}

this.clean();

return processedRecords;
}

/**
* Starting from the first failure index, fail all remaining messages regardless
* of their group ID.
*
* This short circuit mechanism is used when we detect a failed message in the batch.
*
* Since messages in a FIFO queue are processed in order, we must stop processing any
* remaining messages in the batch to prevent out-of-order processing.
*
* @param firstFailureIndex Index of first message that failed
* @param processedRecords Array of response items that have been processed both successfully and unsuccessfully
*/
protected shortCircuitProcessing(
firstFailureIndex: number,
processedRecords: (SuccessResponse | FailureResponse)[]
): (SuccessResponse | FailureResponse)[] {
const remainingRecords = this.records.slice(firstFailureIndex);

for (const record of remainingRecords) {
this.#processFailRecord(record, new SqsFifoShortCircuitError());
}

this.clean();

return processedRecords;
}

/**
* Processes a fail record.
*
* @param record - The record that failed.
* @param exception - The error that occurred.
*/
#processFailRecord(
record: BaseRecord,
exception: BatchProcessingError
): FailureResponse {
const data = this.toBatchType(record, this.eventType);

return this.failureHandler(data, exception);
}
}

export { SqsFifoPartialProcessorAsync };
Loading

0 comments on commit e73b575

Please sign in to comment.