Skip to content
This repository has been archived by the owner on Oct 2, 2024. It is now read-only.

feat: add jetstream pull wrappers #482

Merged
merged 5 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions components/channel/jetstreamPull.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { camelCase, getMessageType, realizeParametersForChannelWrapper, renderJSDocParameters, messageHasNullPayload, realizeChannelName} from '../../utils/index';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';
import { unwrap } from './ChannelParameterUnwrap';

/**
* Component which returns a subscribe to function for the client
*
* @param {string} defaultContentType
* @param {string} channelName to publish to
* @param {Message} message which is being received
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
*/
export function JetstreamPull(channelName, message, channelParameters) {
const messageType = getMessageType(message);
let parameters = [];
parameters = Object.entries(channelParameters).map(([parameterName]) => {
return `${camelCase(parameterName)}Param`;
});
const hasNullPayload = messageHasNullPayload(message.payload());

//Determine the callback process when receiving messages.
//If the message payload is null no hooks are called to process the received data.
let whenReceivingMessage = `onDataCallback(undefined, null ${parameters.length > 0 && `, ${parameters.join(',')}`});`;
if (!hasNullPayload) {
whenReceivingMessage = `
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, ${messageType}.unmarshal(receivedData) ${parameters.length > 0 && `, ${parameters.join(',')}`}, msg);
`;
}
return `
/**
* Internal functionality to setup jetstrema pull on the \`${channelName}\` channel
*
* @param onDataCallback to call when messages are received
* @param js client to pull with
* @param codec used to convert messages
${renderJSDocParameters(channelParameters)}
*/
export function jetStreamPull(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : ${messageType}
${realizeParametersForChannelWrapper(channelParameters, false)},
jetstreamMsg?: Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any >
${realizeParametersForChannelWrapper(channelParameters)},
) {
const stream = ${realizeChannelName(channelParameters, channelName)};
(async () => {
const msg = await js.pull(stream, 'durableName');

${unwrap(channelName, channelParameters)}

${whenReceivingMessage}
})();
}
`;
}
36 changes: 36 additions & 0 deletions components/index/jetstreamPull.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { camelCase, getMessageType, realizeParametersForChannelWrapper, renderJSDocParameters, realizeParametersForChannelWithoutType, pascalCase} from '../../utils/index';

export function JetstreamPull(channelName, message, messageDescription, channelParameters) {
return `
/**
* JetStream pull function.
*
* Pull message from \`${channelName}\`
*
* ${messageDescription}
*
* @param onDataCallback to call when messages are received
${renderJSDocParameters(channelParameters)}
* @param options to pull message with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamPull${pascalCase(channelName)}(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg?: ${getMessageType(message)}
${realizeParametersForChannelWrapper(channelParameters, false)},
jetstreamMsg?: Nats.JsMsg) => void
${realizeParametersForChannelWrapper(channelParameters)}
): void {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
${camelCase(channelName)}Channel.jetStreamPull(
onDataCallback,
this.js,
this.codec
${Object.keys(channelParameters).length ? ` ,${realizeParametersForChannelWithoutType(channelParameters)},` : ''}
);
} else {
throw NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED);
}
}
`;
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,37 @@ export function subscribe(
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
/**
* Internal functionality to setup jetstrema pull on the `streetlight/{streetlight_id}/command/turnon` channel
*
* @param onDataCallback to call when messages are received
* @param js client to pull with
* @param codec used to convert messages
* @param streetlight_id parameter to use in topic
*/
export function jetStreamPull(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any > , streetlight_id: string,
) {
const stream = `streetlight.${streetlight_id}.command.turnon`;
(async () => {
const msg = await js.pull(stream, 'durableName');
const unmodifiedChannel = `streetlight.{streetlight_id}.command.turnon`;
let channel = msg.subject;
const streetlightIdSplit = unmodifiedChannel.split("{streetlight_id}");
const splits = [
streetlightIdSplit[0],
streetlightIdSplit[1]
];
channel = channel.substring(splits[0].length);
const streetlightIdEnd = channel.indexOf(splits[1]);
const streetlightIdParam = "" + channel.substring(0, streetlightIdEnd);
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam, msg);
})();
}
27 changes: 27 additions & 0 deletions examples/simple-subscribe/asyncapi-nats-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,31 @@ export class NatsAsyncApiClient {
}
});
}
/**
* JetStream pull function.
*
* Pull message from `streetlight/{streetlight_id}/command/turnon`
*
* Channel for the turn on command which should turn on the streetlight
*
* @param onDataCallback to call when messages are received
* @param streetlight_id parameter to use in topic
* @param options to pull message with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamPullStreetlightStreetlightIdCommandTurnon(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void, streetlight_id: string
): void {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
streetlightStreetlightIdCommandTurnonChannel.jetStreamPull(
onDataCallback,
this.js,
this.codec, streetlight_id,
);
} else {
throw NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED);
}
}
}
8 changes: 7 additions & 1 deletion template/src/channels/$$channel$$.ts.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { General } from '../../../components/channel/general';
import { pascalCase, isRequestReply, isReplier, isRequester, isPubsub, camelCase} from '../../../utils/index';
// eslint-disable-next-line no-unused-vars
import { AsyncAPIDocument, Channel } from '@asyncapi/parser';
import { JetstreamPull } from '../../../components/channel/jetstreamPull';
import { JetstreamPublish } from '../../../components/channel/jetstreamPublish';

/**
Expand Down Expand Up @@ -70,11 +71,16 @@ function getChannelCode(channel, channelName, params) {
channelcode = `${publishCode} \n${jetstreamPublishCode}`;
}
if (channel.hasPublish()) {
channelcode = Subscribe(
const normalSubscribeCode = Subscribe(
channelName,
publishMessage,
channel.parameters(),
publishOperation);
const jetstreamPullCode = JetstreamPull(
channelName,
publishMessage,
channel.parameters());
channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}`;
}
}
return channelcode;
Expand Down
9 changes: 8 additions & 1 deletion template/src/index.ts.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Request } from '../../components/index/request';
import { isRequestReply, isReplier, isRequester, isPubsub} from '../../utils/index';
// eslint-disable-next-line no-unused-vars
import { AsyncAPIDocument } from '@asyncapi/parser';
import { JetstreamPull } from '../../components/index/jetstreamPull';
import { JetstreamPublish } from '../../components/index/jetstreamPublish';

/**
Expand Down Expand Up @@ -75,11 +76,17 @@ function getChannelWrappers(asyncapi, params) {
return `${normalPublish} \n ${jetStreamPublish}`;
}
if (channel.hasPublish()) {
return Subscribe(
const normalSubscribeCode = Subscribe(
channelName,
publishMessage,
channelDescription,
channelParameters);
const jetstreamPullCode = JetstreamPull(
channelName,
publishMessage,
channelDescription,
channelParameters);
return `${normalSubscribeCode}\n${jetstreamPullCode}`;
}
}
});
Expand Down
9 changes: 8 additions & 1 deletion template/src/testclient/index.ts.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Request } from '../../../components/index/request';
import { isRequestReply, isReplier, isRequester, isPubsub} from '../../../utils/index';
// eslint-disable-next-line no-unused-vars
import { AsyncAPIDocument, ChannelParameter } from '@asyncapi/parser';
import { JetstreamPull } from '../../../components/index/jetstreamPull';
import { JetstreamPublish } from '../../../components/index/jetstreamPublish';

/**
Expand Down Expand Up @@ -61,11 +62,17 @@ function getChannelWrappers(asyncapi, params) {

if (isPubsub(channel)) {
if (channel.hasSubscribe()) {
return Subscribe(
const normalSubscribeCode = Subscribe(
channelName,
subscribeMessage,
channelDescription,
channelParameters);
const jetstreamPullCode = JetstreamPull(
channelName,
subscribeMessage,
channelDescription,
channelParameters);
return `${normalSubscribeCode}\n${jetstreamPullCode}`;
}
if (channel.hasPublish()) {
const normalPublish = Publish(
Expand Down
8 changes: 7 additions & 1 deletion template/src/testclient/testchannels/$$channel$$.ts.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { General } from '../../../../components/channel/general';
import { pascalCase, isRequestReply, isReplier, isRequester, isPubsub, camelCase} from '../../../../utils/index';
// eslint-disable-next-line no-unused-vars
import { AsyncAPIDocument, Channel } from '@asyncapi/parser';
import { JetstreamPull } from '../../../../components/channel/jetstreamPull';
import { JetstreamPublish } from '../../../../components/channel/jetstreamPublish';

/**
Expand Down Expand Up @@ -58,10 +59,15 @@ function getChannelCode(channel, channelName, params) {

if (isPubsub(channel)) {
if (channel.hasSubscribe()) {
channelcode = Subscribe(
const normalSubscribeCode = Subscribe(
channelName,
channel.subscribe() ? channel.subscribe().message(0) : undefined,
channel.parameters());
const jetstreamPullCode = JetstreamPull(
channelName,
channel.subscribe() ? channel.subscribe().message(0) : undefined,
channel.parameters());
channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}`;
}
if (channel.hasPublish()) {
const publishCode = Publish(
Expand Down