Skip to content

Commit

Permalink
create SQS S3 message queue adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasAribart committed Mar 20, 2024
1 parent 3cedccd commit 6d08bfb
Show file tree
Hide file tree
Showing 43 changed files with 833 additions and 135 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/release-to-npm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ jobs:
with:
token: ${{ secrets.NPM_TOKEN }}
package: ./packages/message-queue-adapter-sqs/package.json
- uses: JS-DevTools/npm-publish@v2
with:
token: ${{ secrets.NPM_TOKEN }}
package: ./packages/message-queue-adapter-sqs-s3/package.json
- uses: JS-DevTools/npm-publish@v2
with:
token: ${{ secrets.NPM_TOKEN }}
Expand Down
4 changes: 4 additions & 0 deletions castore.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@
"path": "packages/message-queue-adapter-sqs",
"name": "📨 SQS"
},
{
"path": "packages/message-queue-adapter-sqs-s3",
"name": "📨 SQS + S3"
},
{
"path": "packages/message-queue-adapter-in-memory",
"name": "📨 InMemory"
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/3-reacting-to-events/2-message-queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ import type {
} from '@castore/message-queue-adapter-sqs';

const appMessagesWorker = async ({ Records }: SQSMessageQueueMessage) => {
Records.forEach(({ body }) => {
for (const { body } of Records) {
// 👇 Correctly typed!
const recordBody: SQSMessageQueueMessageBody<typeof appMessageQueue> =
JSON.parse(body);
});
}
};
```

Expand Down
1 change: 1 addition & 0 deletions docs/docs/4-packages.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ To add run-time validation to your commands:
## 📨 Message Queue Adapters

- [SQS Message Queue Adapter](https://www.npmjs.com/package/@castore/message-queue-adapter-sqs): Implementation of the `MessageQueueAdapter` interface based on AWS SQS.
- [SQS + S3 Message Queue Adapter](https://www.npmjs.com/package/@castore/message-queue-adapter-sqs-s3): Implementation of the `MessageQueueAdapter` interface based on AWS SQS and S3.
- [In-Memory Message Queue Adapter](https://www.npmjs.com/package/@castore/message-queue-adapter-in-memory): Implementation of the `MessageQueueAdapter` interface using a local Node/JS queue. To be used in manual or unit tests.

## 🚌 Message Buses Adapters
Expand Down
4 changes: 2 additions & 2 deletions packages/command-json-schema/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# JSON Schema Command

DRY Castore [`Command`](https://github.com/castore-dev/castore/#--command) definition using [JSON Schemas](http://json-schema.org/understanding-json-schema/reference/index.html) and [`json-schema-to-ts`](https://github.com/ThomasAribart/json-schema-to-ts).
DRY Castore [`Command`](https://castore-dev.github.io/castore/docs/event-sourcing/pushing-events/) definition using [JSON Schemas](http://json-schema.org/understanding-json-schema/reference/index.html) and [`json-schema-to-ts`](https://github.com/ThomasAribart/json-schema-to-ts).

## 📥 Installation

Expand Down Expand Up @@ -104,7 +104,7 @@ const pokemonAppearCommand = new Command<

## ⚙️ Properties & Methods

`JSONSchemaCommand` implements the [`Command`](https://github.com/castore-dev/castore/#--command) class and adds the following properties to it:
`JSONSchemaCommand` implements the [`Command`](https://castore-dev.github.io/castore/docs/event-sourcing/pushing-events/) class and adds the following properties to it:

- <code>inputSchema <i>(?object)</i></code>: The command input JSON schema

Expand Down
4 changes: 2 additions & 2 deletions packages/command-zod/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Zod Command

DRY Castore [`Command`](https://github.com/castore-dev/castore/#--command) definition using [`zod`](https://github.com/colinhacks/zod).
DRY Castore [`Command`](https://castore-dev.github.io/castore/docs/event-sourcing/pushing-events/) definition using [`zod`](https://github.com/colinhacks/zod).

## 📥 Installation

Expand Down Expand Up @@ -91,7 +91,7 @@ const pokemonAppearCommand = new Command<

## ⚙️ Properties & Methods

`ZodCommand` implements the [`Command`](https://github.com/castore-dev/castore/#--command) class and adds the following properties to it:
`ZodCommand` implements the [`Command`](https://castore-dev.github.io/castore/docs/event-sourcing/pushing-events/) class and adds the following properties to it:

- <code>inputSchema <i>(?object)</i></code>: The command input zod schema

Expand Down
6 changes: 3 additions & 3 deletions packages/event-storage-adapter-dynamodb/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# DynamoDB Event Storage Adapter

DRY Castore [`EventStorageAdapter`](https://github.com/castore-dev/castore/#--eventstorageadapter) implementation using [AWS DynamoDB](https://aws.amazon.com/dynamodb/).
DRY Castore [`EventStorageAdapter`](https://castore-dev.github.io/castore/docs/event-sourcing/fetching-events/) implementation using [AWS DynamoDB](https://aws.amazon.com/dynamodb/).

## 📥 Installation

Expand Down Expand Up @@ -227,7 +227,7 @@ resource "aws_dynamodb_table" "pokemons-events-table" {

### 🤝 EventGroups

This adapter implements the [EventGroups](https://github.com/castore-dev/castore/#event-groups) API using the [DynamoDB Transactions API](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/transaction-apis.html):
This adapter implements the [EventGroups](https://castore-dev.github.io/castore/docs/event-sourcing/joining-data/) API using the [DynamoDB Transactions API](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/transaction-apis.html):

```ts
import { EventStore } from '@castore/core';
Expand Down Expand Up @@ -459,7 +459,7 @@ resource "aws_dynamodb_table" "pokemons-events-table" {

### 🤝 EventGroups

This adapter implements the [EventGroups](https://github.com/castore-dev/castore/#event-groups) API using the [DynamoDB Transactions API](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/transaction-apis.html):
This adapter implements the [EventGroups](https://castore-dev.github.io/castore/docs/event-sourcing/joining-data/) API using the [DynamoDB Transactions API](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/transaction-apis.html):

```ts
import { EventStore } from '@castore/core';
Expand Down
2 changes: 1 addition & 1 deletion packages/event-storage-adapter-http/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# HTTP Event Storage Adapter

DRY Castore [`EventStorageAdapter`](https://github.com/castore-dev/castore/#--eventstorageadapter) implementation using a HTTP API.
DRY Castore [`EventStorageAdapter`](https://castore-dev.github.io/castore/docs/event-sourcing/fetching-events/) implementation using a HTTP API.

This class is mainly useful when you already have the logic for events implemented and you want to expose your methods for a front-end to use them, eg.

Expand Down
2 changes: 1 addition & 1 deletion packages/event-storage-adapter-in-memory/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# In Memory Event Storage Adapter

DRY Castore [`EventStorageAdapter`](https://github.com/castore-dev/castore/#--eventstorageadapter) implementation using a JS object.
DRY Castore [`EventStorageAdapter`](https://castore-dev.github.io/castore/docs/event-sourcing/fetching-events/) implementation using a JS object.

This class is mainly useful for manual and unit tests. It is obviously not recommended for production uses 🙂

Expand Down
2 changes: 1 addition & 1 deletion packages/event-storage-adapter-redux/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Redux Event Storage Adapter

DRY Castore [`EventStorageAdapter`](https://github.com/castore-dev/castore/#--eventstorageadapter) implementation using a Redux store.
DRY Castore [`EventStorageAdapter`](https://castore-dev.github.io/castore/docs/event-sourcing/fetching-events/) implementation using a Redux store.

## 📥 Installation

Expand Down
4 changes: 2 additions & 2 deletions packages/event-type-json-schema/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# JSON Schema Event

DRY Castore [`EventType`](https://github.com/castore-dev/castore/#--eventtype) definition using [JSON Schemas](http://json-schema.org/understanding-json-schema/reference/index.html) and [`json-schema-to-ts`](https://github.com/ThomasAribart/json-schema-to-ts)
DRY Castore [`EventType`](https://castore-dev.github.io/castore/docs/event-sourcing/events/) definition using [JSON Schemas](http://json-schema.org/understanding-json-schema/reference/index.html) and [`json-schema-to-ts`](https://github.com/ThomasAribart/json-schema-to-ts)

## 📥 Installation

Expand Down Expand Up @@ -68,7 +68,7 @@ const pokemonAppearedEventType = new EventType<

## ⚙️ Properties & Methods

`JSONSchemaEventType` implements the [`EventType`](https://github.com/castore-dev/castore/#--eventtype) class and adds the following properties to it:
`JSONSchemaEventType` implements the [`EventType`](https://castore-dev.github.io/castore/docs/event-sourcing/events/) class and adds the following properties to it:

- <code>payloadSchema <i>(?object)</i></code>: The event type payload JSON schema

Expand Down
4 changes: 2 additions & 2 deletions packages/event-type-zod/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Zod Event

DRY Castore [`EventType`](https://github.com/castore-dev/castore/#--eventtype) definition using [`zod`](https://github.com/colinhacks/zod).
DRY Castore [`EventType`](https://castore-dev.github.io/castore/docs/event-sourcing/events/) definition using [`zod`](https://github.com/colinhacks/zod).

## 📥 Installation

Expand Down Expand Up @@ -60,7 +60,7 @@ const pokemonAppearedEventType = new EventType<

## ⚙️ Properties & Methods

`ZodEventType` implements the [`EventType`](https://github.com/castore-dev/castore/#--eventtype) class and adds the following properties to it:
`ZodEventType` implements the [`EventType`](https://castore-dev.github.io/castore/docs/event-sourcing/events/) class and adds the following properties to it:

- <code>payloadSchema <i>(?object)</i></code>: The event type payload zod schema

Expand Down
8 changes: 4 additions & 4 deletions packages/message-bus-adapter-event-bridge-s3/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# EventBridge Message Bus Adapter
# EventBridge + S3 Message Bus Adapter

DRY Castore [`MessageBus`](https://github.com/castore-dev/castore/#--messagebus) definition using [AWS EventBridge](https://aws.amazon.com/eventbridge/) and [AWS S3](https://aws.amazon.com/s3/).
DRY Castore [`MessageBus`](https://castore-dev.github.io/castore/docs/reacting-to-events/message-buses/) definition using [AWS EventBridge](https://aws.amazon.com/eventbridge/) and [AWS S3](https://aws.amazon.com/s3/).

This adapter works like the [EventBridge Message Bus Adapter](https://www.npmjs.com/package/@castore/message-bus-adapter-event-bridge) (it actually uses it under the hood), excepts that entry sizes are checked before publishing messages to EventBridge. If they are over the [256KB limit](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-putevent-size.html), they are written on a s3 bucket instead, and a message is sent containing a pre-signed URL, as [recommended by AWS](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-putevent-size.html).

Expand All @@ -16,7 +16,7 @@ npm install @castore/message-bus-adapter-event-bridge-s3
yarn add @castore/message-bus-adapter-event-bridge-s3
```

This package has `@castore/core`, `@aws-sdk/client-eventbridge` (above v3), `@aws-sdk/client-eventbridge` (above v3) and `@aws-sdk/s3-request-presigner` (above v3) as peer dependencies, so you will have to install them as well:
This package has `@castore/core`, `@aws-sdk/client-eventbridge` (above v3), `@aws-sdk/client-s3` (above v3) and `@aws-sdk/s3-request-presigner` (above v3) as peer dependencies, so you will have to install them as well:

```bash
# npm
Expand Down Expand Up @@ -134,4 +134,4 @@ const listener = async (

The `publishMessage` method requires the `events:PutEvents` IAM permission on the provided event bus, as well as the `s3:putObject` and `s3:getObject` IAM permissions on the provided s3 bucket at the desired keys (e.g. `my-bucket-name/temporary-storage/*`).

The `parseMessage` doesn't require any permission as the messageURL is pre-signed.
The `parseMessage` util doesn't require any permission as the messageURL is pre-signed.
106 changes: 62 additions & 44 deletions packages/message-bus-adapter-event-bridge-s3/src/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable max-lines */
import type {
EventBridgeClient,
PutEventsRequestEntry,
Expand All @@ -13,7 +14,10 @@ import type { Message, MessageChannelAdapter } from '@castore/core';
import { isEventCarryingMessage } from '@castore/core';
import { EventBridgeMessageBusAdapter } from '@castore/message-bus-adapter-event-bridge';

import { getEntrySize, PUT_EVENTS_ENTRIES_SIZE_LIMIT } from './getEntrySize';
import {
getFormattedMessageSize,
PUT_EVENTS_ENTRIES_SIZE_LIMIT,
} from './getFormattedMessageSize';
import type { OversizedEntryDetail } from './message';

const EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE = 10;
Expand All @@ -29,8 +33,8 @@ export class EventBridgeS3MessageBusAdapter implements MessageChannelAdapter {
s3PreSignatureExpirationInSec: number;

getS3BucketName: () => string;
publishEntry: (
entry: PutEventsRequestEntry,
publishFormattedMessage: (
formattedMessage: PutEventsRequestEntry,
message: Message,
) => Promise<void>;

Expand Down Expand Up @@ -64,14 +68,19 @@ export class EventBridgeS3MessageBusAdapter implements MessageChannelAdapter {
: this.s3BucketName();

this.publishMessage = (message, options) =>
this.publishEntry(
this.eventBridgeMessageBusAdapter.getEntry(message, options),
this.publishFormattedMessage(
this.eventBridgeMessageBusAdapter.formatMessage(message, options),
message,
);

this.publishEntry = async (entry, message) => {
if (getEntrySize(entry) <= PUT_EVENTS_ENTRIES_SIZE_LIMIT) {
return this.eventBridgeMessageBusAdapter.publishEntry(entry);
this.publishFormattedMessage = async (formattedMessage, message) => {
if (
getFormattedMessageSize(formattedMessage) <=
PUT_EVENTS_ENTRIES_SIZE_LIMIT
) {
return this.eventBridgeMessageBusAdapter.publishFormattedMessage(
formattedMessage,
);
}

const { eventStoreId } = message;
Expand Down Expand Up @@ -111,72 +120,81 @@ export class EventBridgeS3MessageBusAdapter implements MessageChannelAdapter {

const oversizedEntryDetail: OversizedEntryDetail = { messageUrl };

return this.eventBridgeMessageBusAdapter.publishEntry({
...entry,
return this.eventBridgeMessageBusAdapter.publishFormattedMessage({
...formattedMessage,
Detail: JSON.stringify(oversizedEntryDetail),
});
};

this.publishMessages = async (messages, options) => {
const entries = messages.map(message =>
this.eventBridgeMessageBusAdapter.getEntry(message, options),
const formattedMessages = messages.map(message =>
this.eventBridgeMessageBusAdapter.formatMessage(message, options),
);

type EntryWithContext = {
type FormattedMessageWithContext = {
message: Message;
entry: PutEventsRequestEntry;
entrySize: number;
formattedMessage: PutEventsRequestEntry;
formattedMessageSize: number;
};

const entriesWithContext: EntryWithContext[] = entries.map(
(entry, index) => ({
entry,
entrySize: getEntrySize(entry),
const formattedMessagesWithContext: FormattedMessageWithContext[] =
formattedMessages.map((formattedMessage, index) => ({
formattedMessage,
formattedMessageSize: getFormattedMessageSize(formattedMessage),
message: messages[index] as Message,
}),
);
}));

entriesWithContext.sort(
({ entrySize: sizeA }, { entrySize: sizeB }) => sizeA - sizeB,
formattedMessagesWithContext.sort(
({ formattedMessageSize: sizeA }, { formattedMessageSize: sizeB }) =>
sizeA - sizeB,
);

const entryBatches: EntryWithContext[][] = [[]];
let currentBatch = entryBatches[0] as EntryWithContext[];
const formattedMessageBatches: FormattedMessageWithContext[][] = [[]];
let currentBatch =
formattedMessageBatches[0] as FormattedMessageWithContext[];
let currentBatchSize = 0;

// NOTE: We could search for the largest fitting entry instead of doing a for loop
for (const entryWithContext of entriesWithContext) {
const { entrySize } = entryWithContext;
// NOTE: We could search for the largest fitting formattedMessage instead of doing a for loop
for (const formattedMessageWithContext of formattedMessagesWithContext) {
const { formattedMessageSize } = formattedMessageWithContext;

if (
currentBatch.length < EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE &&
currentBatchSize + entrySize <= PUT_EVENTS_ENTRIES_SIZE_LIMIT
currentBatchSize + formattedMessageSize <=
PUT_EVENTS_ENTRIES_SIZE_LIMIT
) {
currentBatch.push(entryWithContext);
currentBatchSize += entrySize;
currentBatch.push(formattedMessageWithContext);
currentBatchSize += formattedMessageSize;
} else {
entryBatches.push([entryWithContext]);
currentBatch = entryBatches.at(-1) as EntryWithContext[];
currentBatchSize = entrySize;
formattedMessageBatches.push([formattedMessageWithContext]);
currentBatch = formattedMessageBatches.at(
-1,
) as FormattedMessageWithContext[];
currentBatchSize = formattedMessageSize;
}
}

for (const entryBatch of entryBatches) {
if (entryBatch.length === 1) {
const [messageAndEntry] = entryBatch as [EntryWithContext];
const { entry, message } = messageAndEntry;
for (const formattedMessageBatch of formattedMessageBatches) {
if (formattedMessageBatch.length === 0) {
// Can happen for first batch if first message is oversized
continue;
}

if (formattedMessageBatch.length === 1) {
const [formattedMessageWithContext] = formattedMessageBatch as [
FormattedMessageWithContext,
];
const { formattedMessage, message } = formattedMessageWithContext;

// TODO: create publishEntry(entry, message) method
return this.publishEntry(entry, message);
await this.publishFormattedMessage(formattedMessage, message);
continue;
}

// We are sure that the batch is not oversized if there is more than 1 entry
return this.eventBridgeMessageBusAdapter.publishEntries(
entryBatch.map(({ entry }) => entry),
await this.eventBridgeMessageBusAdapter.publishFormattedMessages(
formattedMessageBatch.map(({ formattedMessage }) => formattedMessage),
);
}

await this.eventBridgeMessageBusAdapter.publishEntries(entries);
};
}

Expand Down
31 changes: 0 additions & 31 deletions packages/message-bus-adapter-event-bridge-s3/src/getEntrySize.ts

This file was deleted.

Loading

0 comments on commit 6d08bfb

Please sign in to comment.