Skip to content

Commit

Permalink
Separate metadata definitions between publishers and consumers, fill …
Browse files Browse the repository at this point in the history
…incomplete metadata entries (#182)
  • Loading branch information
kibertoad authored Jul 2, 2024
1 parent f6612d8 commit 0215b4c
Show file tree
Hide file tree
Showing 18 changed files with 153 additions and 61 deletions.
28 changes: 28 additions & 0 deletions packages/amqp/lib/AmqpQueuePublisherManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,33 @@ describe('AmqpQueuePublisherManager', () => {

expect(result.processingResult).toBe('consumed')
})

it('fills incomplete metadata', async () => {
const { queuePublisherManager } = diContainer.cradle
const fakeConsumer = new FakeQueueConsumer(diContainer.cradle, TestEvents.updated)
await fakeConsumer.start()

const publishedMessage = queuePublisherManager.publishSync(FakeQueueConsumer.QUEUE_NAME, {
type: 'entity.updated',
payload: {
updatedData: 'msg',
},
metadata: {
correlationId: 'some-id',
},
})

const result = await fakeConsumer.handlerSpy.waitForMessageWithId(publishedMessage.id)

expect(result.processingResult).toBe('consumed')
expect(result.message.metadata).toMatchInlineSnapshot(`
{
"correlationId": "some-id",
"originatedFrom": "service",
"producedBy": "service",
"schemaVersion": "1.0.0",
}
`)
})
})
})
4 changes: 2 additions & 2 deletions packages/amqp/lib/AmqpQueuePublisherManager.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type {
CommonCreationConfigType,
EventRegistry,
MessageMetadataType,
MessagePublishType,
MessageSchemaType,
MetadataFiller,
PublisherBaseEventType,
PublisherMessageMetadataType,
} from '@message-queue-toolkit/core'
import { AbstractPublisherManager } from '@message-queue-toolkit/core'
import type { AmqpAwareEventDefinition } from '@message-queue-toolkit/schemas'
Expand Down Expand Up @@ -72,7 +72,7 @@ export class AmqpQueuePublisherManager<
z.infer<SupportedEventDefinitions[number]['publisherSchema']>
>,
SupportedEventDefinitions extends AmqpAwareEventDefinition[],
MetadataType = MessageMetadataType,
MetadataType = PublisherMessageMetadataType,
> extends AbstractPublisherManager<
AmqpAwareEventDefinition,
NonNullable<SupportedEventDefinitions[number]['queueName']>,
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp/lib/AmqpTopicPublisherManager.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { AbstractPublisherManager } from '@message-queue-toolkit/core'
import type {
MessageMetadataType,
MessagePublishType,
MessageSchemaType,
PublisherMessageMetadataType,
} from '@message-queue-toolkit/core'
import type z from 'zod'

Expand All @@ -25,7 +25,7 @@ export class AmqpTopicPublisherManager<
z.infer<SupportedEventDefinitions[number]['publisherSchema']>
>,
SupportedEventDefinitions extends AmqpAwareEventDefinition[],
MetadataType = MessageMetadataType,
MetadataType = PublisherMessageMetadataType,
> extends AbstractPublisherManager<
AmqpAwareEventDefinition,
NonNullable<SupportedEventDefinitions[number]['exchange']>,
Expand Down
9 changes: 6 additions & 3 deletions packages/amqp/test/fakes/CustomFakeConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import type { BaseMessageType } from '@message-queue-toolkit/core'
import type { PublisherBaseMessageType } from '@message-queue-toolkit/core'
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
import type { ZodSchema } from 'zod'

import { AbstractAmqpQueueConsumer } from '../../lib/AbstractAmqpQueueConsumer'
import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService'

export class CustomFakeConsumer extends AbstractAmqpQueueConsumer<BaseMessageType, unknown> {
export class CustomFakeConsumer extends AbstractAmqpQueueConsumer<
PublisherBaseMessageType,
unknown
> {
public static readonly QUEUE_NAME = 'dummy-queue'
constructor(dependencies: AMQPConsumerDependencies, schema: ZodSchema) {
super(
Expand All @@ -20,7 +23,7 @@ export class CustomFakeConsumer extends AbstractAmqpQueueConsumer<BaseMessageTyp
},
handlerSpy: true,
messageTypeField: 'messageType',
handlers: new MessageHandlerConfigBuilder<BaseMessageType, unknown>()
handlers: new MessageHandlerConfigBuilder<PublisherBaseMessageType, unknown>()
.addConfig(schema, () => Promise.resolve({ result: 'success' }))
.build(),
},
Expand Down
9 changes: 6 additions & 3 deletions packages/amqp/test/fakes/FakeQueueConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import type { BaseMessageType } from '@message-queue-toolkit/core'
import type { PublisherBaseMessageType } from '@message-queue-toolkit/core'
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'

import { AbstractAmqpQueueConsumer } from '../../lib/AbstractAmqpQueueConsumer'
import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService'
import type { AmqpAwareEventDefinition } from '../../lib/AmqpQueuePublisherManager'

export class FakeQueueConsumer extends AbstractAmqpQueueConsumer<BaseMessageType, unknown> {
export class FakeQueueConsumer extends AbstractAmqpQueueConsumer<
PublisherBaseMessageType,
unknown
> {
public static readonly QUEUE_NAME = 'dummy-queue'
constructor(dependencies: AMQPConsumerDependencies, eventDefinition: AmqpAwareEventDefinition) {
super(
Expand All @@ -23,7 +26,7 @@ export class FakeQueueConsumer extends AbstractAmqpQueueConsumer<BaseMessageType
},
handlerSpy: true,
messageTypeField: 'type',
handlers: new MessageHandlerConfigBuilder<BaseMessageType, unknown>()
handlers: new MessageHandlerConfigBuilder<PublisherBaseMessageType, unknown>()
.addConfig(eventDefinition.consumerSchema, () => Promise.resolve({ result: 'success' }))
.build(),
},
Expand Down
9 changes: 6 additions & 3 deletions packages/amqp/test/fakes/FakeTopicConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import type { BaseMessageType } from '@message-queue-toolkit/core'
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'

import type { PublisherBaseMessageType } from '@message-queue-toolkit/schemas'
import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService'
import { AbstractAmqpTopicConsumer } from '../../lib/AbstractAmqpTopicConsumer'
import type { AmqpAwareEventDefinition } from '../../lib/AmqpQueuePublisherManager'

export class FakeTopicConsumer extends AbstractAmqpTopicConsumer<BaseMessageType, unknown> {
export class FakeTopicConsumer extends AbstractAmqpTopicConsumer<
PublisherBaseMessageType,
unknown
> {
public messageCounter = 0

constructor(
Expand Down Expand Up @@ -39,7 +42,7 @@ export class FakeTopicConsumer extends AbstractAmqpTopicConsumer<BaseMessageType
},
handlerSpy: true,
messageTypeField: 'type',
handlers: new MessageHandlerConfigBuilder<BaseMessageType, unknown>()
handlers: new MessageHandlerConfigBuilder<PublisherBaseMessageType, unknown>()
.addConfig(eventDefinition.consumerSchema, () => {
this.messageCounter++
return Promise.resolve({ result: 'success' })
Expand Down
4 changes: 2 additions & 2 deletions packages/core/lib/events/DomainEventEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { InternalError } from '@lokalise/node-core'

import type { MetadataFiller } from '../messages/MetadataFiller'
import type { MessageMetadataType } from '../messages/baseMessageSchemas'
import type { HandlerSpy, HandlerSpyParams, PublicHandlerSpy } from '../queues/HandlerSpy'
import { resolveHandlerSpy } from '../queues/HandlerSpy'

import type { PublisherMessageMetadataType } from '@message-queue-toolkit/schemas'
import type { EventRegistry } from './EventRegistry'
import type {
AnyEventHandler,
Expand Down Expand Up @@ -62,7 +62,7 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>
public async emit<SupportedEvent extends SupportedEvents[number]>(
supportedEvent: SupportedEvent,
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>,
metadata?: Partial<MessageMetadataType>,
metadata?: PublisherMessageMetadataType,
): Promise<Omit<CommonEventDefinitionConsumerSchemaType<SupportedEvent>, 'type'>> {
if (!data.timestamp) {
data.timestamp = this.metadataFiller.produceTimestamp()
Expand Down
6 changes: 3 additions & 3 deletions packages/core/lib/events/fakes/FakeListener.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import type { MessageMetadataType } from '../../messages/baseMessageSchemas'
import type { ConsumerMessageMetadataType } from '@message-queue-toolkit/schemas'
import type { AnyEventHandler, CommonEventDefinition } from '../eventTypes'

export class FakeListener<SupportedEvents extends CommonEventDefinition[]>
implements AnyEventHandler<SupportedEvents>
{
public receivedEvents: SupportedEvents[number]['consumerSchema']['_output'][] = []
public receivedMetadata: MessageMetadataType[] = []
public receivedMetadata: ConsumerMessageMetadataType[] = []

constructor(_supportedEvents: SupportedEvents) {
this.receivedEvents = []
}

handleEvent(
event: SupportedEvents[number]['consumerSchema']['_output'],
metadata: MessageMetadataType,
metadata: ConsumerMessageMetadataType,
): void | Promise<void> {
this.receivedEvents.push(event)
this.receivedMetadata.push(metadata)
Expand Down
8 changes: 4 additions & 4 deletions packages/core/lib/messages/MetadataFiller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { randomUUID } from 'node:crypto'
import type { PublisherBaseEventType } from '../events/baseEventSchemas'
import type { CommonEventDefinition } from '../events/eventTypes'

import type { MessageMetadataType } from './baseMessageSchemas'
import type { PublisherMessageMetadataType } from './baseMessageSchemas'

export type IdGenerator = () => string
export type TimestampGenerator = () => string
Expand All @@ -18,7 +18,7 @@ export type MetadataFillerOptions = {
export type MetadataFiller<
T extends PublisherBaseEventType = PublisherBaseEventType,
D = CommonEventDefinition,
M = MessageMetadataType,
M = PublisherMessageMetadataType,
> = {
produceMetadata(currentMessage: T, eventDefinition: D, precedingMessageMetadata?: M): M
produceId(): string
Expand Down Expand Up @@ -49,8 +49,8 @@ export class CommonMetadataFiller implements MetadataFiller {
produceMetadata(
_currentMessage: PublisherBaseEventType,
eventDefinition: Pick<CommonEventDefinition, 'schemaVersion'>,
precedingMessageMetadata?: Omit<MessageMetadataType, 'producedBy'>,
): MessageMetadataType {
precedingMessageMetadata?: Omit<PublisherMessageMetadataType, 'producedBy'>,
): PublisherMessageMetadataType {
return {
producedBy: this.serviceId,
originatedFrom: precedingMessageMetadata?.originatedFrom ?? this.serviceId,
Expand Down
15 changes: 10 additions & 5 deletions packages/core/lib/messages/baseMessageSchemas.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
export {
MESSAGE_METADATA_SCHEMA,
MESSAGE_SCHEMA_EXTENSION,
BASE_MESSAGE_SCHEMA,
BaseMessageType,
MessageMetadataType,
PUBLISHER_MESSAGE_METADATA_SCHEMA,
CONSUMER_MESSAGE_METADATA_SCHEMA,
PUBLISHER_MESSAGE_SCHEMA_EXTENSION,
CONSUMER_MESSAGE_SCHEMA_EXTENSION,
PUBLISHER_BASE_MESSAGE_SCHEMA,
CONSUMER_BASE_MESSAGE_SCHEMA,
ConsumerBaseMessageType,
PublisherBaseMessageType,
PublisherMessageMetadataType,
ConsumerMessageMetadataType,
CommonMessageDefinitionSchemaType,
enrichMessageSchemaWithBase,
} from '@message-queue-toolkit/schemas'
16 changes: 13 additions & 3 deletions packages/core/lib/queues/AbstractPublisherManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,22 @@ export abstract class AbstractPublisherManager<
message: MessagePublishType<SupportedEventDefinitions[number]>,
precedingEventMetadata?: Partial<MetadataType>,
): MessageSchemaType<SupportedEventDefinitions[number]> {
const producedMetadata = this.metadataFiller.produceMetadata(
// @ts-ignore
message,
messageDefinition,
precedingEventMetadata,
)

// @ts-ignore
const resolvedMetadata = message[this.metadataField]
? // @ts-ignore
message[this.metadataField]
? {
...producedMetadata,
// @ts-ignore
...message[this.metadataField],
}
: // @ts-ignore
this.metadataFiller.produceMetadata(message, messageDefinition, precedingEventMetadata)
producedMetadata

// @ts-ignore
return {
Expand Down
5 changes: 3 additions & 2 deletions packages/schemas/lib/events/eventTypes.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { ZodObject, ZodTypeAny } from 'zod'
import type z from 'zod'

import type { MessageMetadataType } from '../messages/baseMessageSchemas'
import type { PublisherMessageMetadataType } from '../messages/baseMessageSchemas'

import type { CONSUMER_BASE_EVENT_SCHEMA, PUBLISHER_BASE_EVENT_SCHEMA } from './baseEventSchemas'

Expand Down Expand Up @@ -40,7 +40,8 @@ export type CommonEventDefinitionPublisherSchemaType<T extends CommonEventDefini
export type EventHandler<
EventDefinitionSchema extends
CommonEventDefinitionConsumerSchemaType<CommonEventDefinition> = CommonEventDefinitionConsumerSchemaType<CommonEventDefinition>,
MetadataDefinitionSchema extends Partial<MessageMetadataType> = Partial<MessageMetadataType>,
MetadataDefinitionSchema extends
Partial<PublisherMessageMetadataType> = Partial<PublisherMessageMetadataType>,
> = {
handleEvent(
event: EventDefinitionSchema,
Expand Down
Loading

0 comments on commit 0215b4c

Please sign in to comment.