Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): bench pub support split file content #1642

Merged
merged 3 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"@types/debug": "^4.1.12",
"@types/js-yaml": "^4.0.5",
"@types/json-bigint": "^1.0.4",
"@types/lodash": "^4.17.1",
"@types/node": "^17.0.43",
"@types/pump": "^1.1.1",
"@types/readable-stream": "^2.3.13",
Expand Down
1 change: 1 addition & 0 deletions cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ export class Commander {
'load the parameters from the local configuration file, which supports json and yaml format, default path is ./mqttx-cli-config.json',
)
.option('--file-read <PATH>', 'read the message body from the file', parseFileRead)
.option('--split <CHARACTER>', 'split the input message in a single file by a specified character.')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should set \n as the default value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I thought so at first, but I didn't finish.

.allowUnknownOption(false)
.action(benchPub)

Expand Down
19 changes: 17 additions & 2 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import delay from '../utils/delay'
import { saveConfig, loadConfig } from '../utils/config'
import { loadSimulator } from '../utils/simulate'
import { serializeProtobufToBuffer } from '../utils/protobuf'
import { readFile, processPath } from '../utils/fileUtils'
import { readFile, processPath, fileDataSplitter, getPublishMessageFromFile } from '../utils/fileUtils'
import convertPayload from '../utils/convertPayload'
import * as Debug from 'debug'
import _ from 'lodash'

const processPublishMessage = (
message: string | Buffer,
Expand Down Expand Up @@ -228,11 +229,17 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
fileRead,
verbose,
maximumReconnectTimes,
split,
} = options

// File Handler
let fileData: Buffer | string
let splitedMessageArr: string[] = []
if (fileRead) {
fileData = handleFileRead(processPath(fileRead))
if (split) {
splitedMessageArr = fileDataSplitter(fileData, split)
}
}

checkTopicExists(topic, commandType)
Expand Down Expand Up @@ -280,6 +287,10 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
let rate = 0

for (let i = 1; i <= count; i++) {
// Duplicate splited messages array for each connection
const dupSplitedMessageArr: string[] =
split && fileRead && splitedMessageArr.length !== 0 ? _.cloneDeep(splitedMessageArr) : []

;((i: number, connOpts: mqtt.IClientOptions) => {
const opts = { ...connOpts }

Expand All @@ -304,6 +315,7 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
if (limit > 0 && total >= limit) {
// Wait for the total number of sent messages to be printed, then exit the process.
await delay(1000)
signale.success(`All ${total} messages have been sent, reaching the limit of ${limit}.`)
process.exit(0)
}
// If not initialized or client is not connected or message count exceeds the limit, do not send messages.
Expand All @@ -322,7 +334,10 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
publishMessage = simulationResult.message
}
if (fileRead) {
publishMessage = fileData
publishMessage = await getPublishMessageFromFile(split, dupSplitedMessageArr, fileData, {
total,
fileRead,
})
}
client.publish(publishTopic, publishMessage, pubOpts.opts, (err) => {
inFlightMessageCount -= 1
Expand Down
1 change: 1 addition & 0 deletions cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ declare global {
messageInterval: number
limit: number
verbose: boolean
split: string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an optional option, which may be undefined.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the limit, verbose, and others are optional, too, right? Was there no impact before?

Copy link
Member

@Red-Asuka Red-Asuka May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of limit is 0, and verbose is a boolean type option. The default behavior is false, so it has no effect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we need to set a default value for the split option as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be similar to the save and config options. If the option is not declared, it will be undefined. If the option is declared but no value is set, it will default to a default value.

}

type OmitSubscribeOptions = Omit<
Expand Down
28 changes: 28 additions & 0 deletions cli/src/utils/fileUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import fs from 'fs'
import path from 'path'
import YAML from 'js-yaml'
import signale from 'signale'
import delay from './delay'

const processPath = (savePath: boolean | string, defaultPath?: string) => {
let filePath = ''
Expand Down Expand Up @@ -91,6 +92,31 @@ const createNextNumberedFileName = (filePath: string): string => {
}
}

const fileDataSplitter = (data: string | Buffer, split: string): string[] => {
const stringData = data.toString('utf-8')
const splitRegex = new RegExp(split, 'g')
return stringData.split(splitRegex)
}

const getPublishMessageFromFile = async (
split: string,
dupSplitedMessageArr: string[],
fileData: string | Buffer,
meta: { total: number; fileRead: string },
): Promise<Buffer | string> => {
if (!split) {
return fileData
}
if (dupSplitedMessageArr.length === 0) {
await delay(1000)
signale.success(`All ${meta.total} messages from the ${meta.fileRead} have been successfully sent.`)
process.exit(0)
}

const unshiftedMessage = dupSplitedMessageArr.shift()
return Buffer.from(unshiftedMessage!)
}

export {
processPath,
getPathExtname,
Expand All @@ -102,4 +128,6 @@ export {
writeFile,
appendFile,
createNextNumberedFileName,
fileDataSplitter,
getPublishMessageFromFile,
}
5 changes: 5 additions & 0 deletions cli/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@
resolved "https://registry.yarnpkg.com/@types/json-bigint/-/json-bigint-1.0.4.tgz#250d29e593375499d8ba6efaab22d094c3199ef3"
integrity sha512-ydHooXLbOmxBbubnA7Eh+RpBzuaIiQjh8WGJYQB50JFGFrdxW7JzVlyEV7fAXw0T2sqJ1ysTneJbiyNLqZRAag==

"@types/lodash@^4.17.1":
version "4.17.1"
resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.17.1.tgz#0fabfcf2f2127ef73b119d98452bd317c4a17eb8"
integrity sha512-X+2qazGS3jxLAIz5JDXDzglAF3KpijdhFxlf/V1+hEsOUc+HnWi81L/uv/EvGuV90WY+7mPGFCUDGfQC3Gj95Q==

"@types/ms@*":
version "0.7.34"
resolved "https://registry.yarnpkg.com/@types/ms/-/ms-0.7.34.tgz#10964ba0dee6ac4cd462e2795b6bebd407303433"
Expand Down
2 changes: 1 addition & 1 deletion src/database/services/MessageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export default class MessageService {
// @ts-ignore
@InjectRepository(MessageEntity)
private messageRepository: Repository<MessageEntity>,
) { }
) {}

public static modelToEntity(model: MessageModel, connectionId: string | undefined): MessageEntity {
return {
Expand Down
Loading