diff --git a/cli/package.json b/cli/package.json index cc5471b21..c4a2817a1 100644 --- a/cli/package.json +++ b/cli/package.json @@ -44,6 +44,7 @@ "@types/debug": "^4.1.12", "@types/js-yaml": "^4.0.5", "@types/json-bigint": "^1.0.4", + "@types/lodash": "^4.17.1", "@types/node": "^17.0.43", "@types/pump": "^1.1.1", "@types/readable-stream": "^2.3.13", diff --git a/cli/src/index.ts b/cli/src/index.ts index 1bb7eae51..f1d445e09 100755 --- a/cli/src/index.ts +++ b/cli/src/index.ts @@ -501,6 +501,7 @@ export class Commander { 'load the parameters from the local configuration file, which supports json and yaml format, default path is ./mqttx-cli-config.json', ) .option('--file-read ', 'read the message body from the file', parseFileRead) + .option('--split ', 'split the input message in a single file by a specified character.') .allowUnknownOption(false) .action(benchPub) diff --git a/cli/src/lib/pub.ts b/cli/src/lib/pub.ts index 0265eab48..306e638d4 100644 --- a/cli/src/lib/pub.ts +++ b/cli/src/lib/pub.ts @@ -10,9 +10,10 @@ import delay from '../utils/delay' import { saveConfig, loadConfig } from '../utils/config' import { loadSimulator } from '../utils/simulate' import { serializeProtobufToBuffer } from '../utils/protobuf' -import { readFile, processPath } from '../utils/fileUtils' +import { readFile, processPath, fileDataSplitter, getPublishMessageFromFile } from '../utils/fileUtils' import convertPayload from '../utils/convertPayload' import * as Debug from 'debug' +import _ from 'lodash' const processPublishMessage = ( message: string | Buffer, @@ -228,11 +229,17 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions | fileRead, verbose, maximumReconnectTimes, + split, } = options + // File Handler let fileData: Buffer | string + let splitedMessageArr: string[] = [] if (fileRead) { fileData = handleFileRead(processPath(fileRead)) + if (split) { + splitedMessageArr = fileDataSplitter(fileData, split) + } } checkTopicExists(topic, commandType) @@ -280,6 +287,10 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions | let rate = 0 for (let i = 1; i <= count; i++) { + // Duplicate splited messages array for each connection + const dupSplitedMessageArr: string[] = + split && fileRead && splitedMessageArr.length !== 0 ? _.cloneDeep(splitedMessageArr) : [] + ;((i: number, connOpts: mqtt.IClientOptions) => { const opts = { ...connOpts } @@ -304,6 +315,7 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions | if (limit > 0 && total >= limit) { // Wait for the total number of sent messages to be printed, then exit the process. await delay(1000) + signale.success(`All ${total} messages have been sent, reaching the limit of ${limit}.`) process.exit(0) } // If not initialized or client is not connected or message count exceeds the limit, do not send messages. @@ -322,7 +334,10 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions | publishMessage = simulationResult.message } if (fileRead) { - publishMessage = fileData + publishMessage = await getPublishMessageFromFile(split, dupSplitedMessageArr, fileData, { + total, + fileRead, + }) } client.publish(publishTopic, publishMessage, pubOpts.opts, (err) => { inFlightMessageCount -= 1 diff --git a/cli/src/types/global.d.ts b/cli/src/types/global.d.ts index e6d3ef5af..c9c778732 100644 --- a/cli/src/types/global.d.ts +++ b/cli/src/types/global.d.ts @@ -114,6 +114,7 @@ declare global { messageInterval: number limit: number verbose: boolean + split: string } type OmitSubscribeOptions = Omit< diff --git a/cli/src/utils/fileUtils.ts b/cli/src/utils/fileUtils.ts index 827d36002..e852ca4ea 100644 --- a/cli/src/utils/fileUtils.ts +++ b/cli/src/utils/fileUtils.ts @@ -2,6 +2,7 @@ import fs from 'fs' import path from 'path' import YAML from 'js-yaml' import signale from 'signale' +import delay from './delay' const processPath = (savePath: boolean | string, defaultPath?: string) => { let filePath = '' @@ -91,6 +92,31 @@ const createNextNumberedFileName = (filePath: string): string => { } } +const fileDataSplitter = (data: string | Buffer, split: string): string[] => { + const stringData = data.toString('utf-8') + const splitRegex = new RegExp(split, 'g') + return stringData.split(splitRegex) +} + +const getPublishMessageFromFile = async ( + split: string, + dupSplitedMessageArr: string[], + fileData: string | Buffer, + meta: { total: number; fileRead: string }, +): Promise => { + if (!split) { + return fileData + } + if (dupSplitedMessageArr.length === 0) { + await delay(1000) + signale.success(`All ${meta.total} messages from the ${meta.fileRead} have been successfully sent.`) + process.exit(0) + } + + const unshiftedMessage = dupSplitedMessageArr.shift() + return Buffer.from(unshiftedMessage!) +} + export { processPath, getPathExtname, @@ -102,4 +128,6 @@ export { writeFile, appendFile, createNextNumberedFileName, + fileDataSplitter, + getPublishMessageFromFile, } diff --git a/cli/yarn.lock b/cli/yarn.lock index 7a431d121..0a9577d7a 100644 --- a/cli/yarn.lock +++ b/cli/yarn.lock @@ -89,6 +89,11 @@ resolved "https://registry.yarnpkg.com/@types/json-bigint/-/json-bigint-1.0.4.tgz#250d29e593375499d8ba6efaab22d094c3199ef3" integrity sha512-ydHooXLbOmxBbubnA7Eh+RpBzuaIiQjh8WGJYQB50JFGFrdxW7JzVlyEV7fAXw0T2sqJ1ysTneJbiyNLqZRAag== +"@types/lodash@^4.17.1": + version "4.17.1" + resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.17.1.tgz#0fabfcf2f2127ef73b119d98452bd317c4a17eb8" + integrity sha512-X+2qazGS3jxLAIz5JDXDzglAF3KpijdhFxlf/V1+hEsOUc+HnWi81L/uv/EvGuV90WY+7mPGFCUDGfQC3Gj95Q== + "@types/ms@*": version "0.7.34" resolved "https://registry.yarnpkg.com/@types/ms/-/ms-0.7.34.tgz#10964ba0dee6ac4cd462e2795b6bebd407303433" diff --git a/src/database/services/MessageService.ts b/src/database/services/MessageService.ts index 8310e4848..f284c8c8f 100644 --- a/src/database/services/MessageService.ts +++ b/src/database/services/MessageService.ts @@ -9,7 +9,7 @@ export default class MessageService { // @ts-ignore @InjectRepository(MessageEntity) private messageRepository: Repository, - ) { } + ) {} public static modelToEntity(model: MessageModel, connectionId: string | undefined): MessageEntity { return {