Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cli): fix split option bugs & set default value for split option #1643

Merged
merged 2 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ export class Commander {
'load the parameters from the local configuration file, which supports json and yaml format, default path is ./mqttx-cli-config.json',
)
.option('--file-read <PATH>', 'read the message body from the file', parseFileRead)
.option('--split <CHARACTER>', 'split the input message in a single file by a specified character.')
.option('--split [CHARACTER]', 'split the input message in a single file by a specified character, default is \n')
.allowUnknownOption(false)
.action(benchPub)

Expand Down
33 changes: 25 additions & 8 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import delay from '../utils/delay'
import { saveConfig, loadConfig } from '../utils/config'
import { loadSimulator } from '../utils/simulate'
import { serializeProtobufToBuffer } from '../utils/protobuf'
import { readFile, processPath, fileDataSplitter, getPublishMessageFromFile } from '../utils/fileUtils'
import { readFile, processPath, fileDataSplitter } from '../utils/fileUtils'
import convertPayload from '../utils/convertPayload'
import * as Debug from 'debug'
import _ from 'lodash'
Expand Down Expand Up @@ -256,6 +256,8 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |

let inFlightMessageCount = 0

const splitLimit = splitedMessageArr.length * count

const isNewConnArray = Array(count).fill(true)

const retryTimesArray = Array(count).fill(0)
Expand Down Expand Up @@ -288,8 +290,7 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |

for (let i = 1; i <= count; i++) {
// Duplicate splited messages array for each connection
const dupSplitedMessageArr: string[] =
split && fileRead && splitedMessageArr.length !== 0 ? _.cloneDeep(splitedMessageArr) : []
const dupSplitedMessageArr = splitedMessageArr.length !== 0 ? _.cloneDeep(splitedMessageArr) : []

;((i: number, connOpts: mqtt.IClientOptions) => {
const opts = { ...connOpts }
Expand Down Expand Up @@ -318,8 +319,20 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
signale.success(`All ${total} messages have been sent, reaching the limit of ${limit}.`)
process.exit(0)
}
// If the segmented message has been completely sent, exit the process.
if (splitLimit > 0 && total >= splitLimit) {
// Wait for the total number of sent messages to be printed, then exit the process.
await delay(1000)
signale.success(`All ${total} messages from the ${fileRead} have been successfully sent.`)
process.exit(0)
}
// If not initialized or client is not connected or message count exceeds the limit, do not send messages.
if (!initialized || !client.connected || (limit > 0 && total + inFlightMessageCount >= limit)) {
if (
!initialized ||
!client.connected ||
(limit > 0 && total + inFlightMessageCount >= limit) ||
(splitLimit > 0 && total + inFlightMessageCount >= splitLimit)
) {
return
}
inFlightMessageCount += 1
Expand All @@ -334,10 +347,14 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
publishMessage = simulationResult.message
}
if (fileRead) {
publishMessage = await getPublishMessageFromFile(split, dupSplitedMessageArr, fileData, {
total,
fileRead,
})
if (!split) {
publishMessage = fileData
} else {
if (dupSplitedMessageArr.length === 0) {
return
}
publishMessage = Buffer.from(dupSplitedMessageArr.shift()!)
}
}
client.publish(publishTopic, publishMessage, pubOpts.opts, (err) => {
inFlightMessageCount -= 1
Expand Down
4 changes: 2 additions & 2 deletions cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ declare global {

type MQTTVersion = 3 | 4 | 5

type Protocol = 'mqtt' | 'mqtts'
type Protocol = 'mqtt' | 'mqtts' | 'ws' | 'wss'

type QoS = 0 | 1 | 2

Expand Down Expand Up @@ -114,7 +114,7 @@ declare global {
messageInterval: number
limit: number
verbose: boolean
split: string
split?: boolean | string
}

type OmitSubscribeOptions = Omit<
Expand Down
26 changes: 5 additions & 21 deletions cli/src/utils/fileUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,31 +92,16 @@ const createNextNumberedFileName = (filePath: string): string => {
}
}

const fileDataSplitter = (data: string | Buffer, split: string): string[] => {
const fileDataSplitter = (data: string | Buffer, split: true | string): string[] => {
let defaultSplit = '\n'
if (split === true) {
split = defaultSplit
}
const stringData = data.toString('utf-8')
const splitRegex = new RegExp(split, 'g')
return stringData.split(splitRegex)
}

const getPublishMessageFromFile = async (
split: string,
dupSplitedMessageArr: string[],
fileData: string | Buffer,
meta: { total: number; fileRead: string },
): Promise<Buffer | string> => {
if (!split) {
return fileData
}
if (dupSplitedMessageArr.length === 0) {
await delay(1000)
signale.success(`All ${meta.total} messages from the ${meta.fileRead} have been successfully sent.`)
process.exit(0)
}

const unshiftedMessage = dupSplitedMessageArr.shift()
return Buffer.from(unshiftedMessage!)
}

export {
processPath,
getPathExtname,
Expand All @@ -129,5 +114,4 @@ export {
appendFile,
createNextNumberedFileName,
fileDataSplitter,
getPublishMessageFromFile,
}
4 changes: 0 additions & 4 deletions cli/src/utils/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,6 @@ const parseConnectOptions = (
connectOptions.ca = fs.readFileSync(ca)
}

if (key && cert && protocol !== 'mqtts') {
connectOptions.protocol = 'mqtts'
}

if (insecure) {
connectOptions.rejectUnauthorized = false
}
Expand Down
Loading