|  | 
|  | 1 | +/* eslint-disable no-console */ | 
|  | 2 | +/* eslint-disable @typescript-eslint/no-var-requires */ | 
|  | 3 | + | 
|  | 4 | +// run this file with ts-node: | 
|  | 5 | +// npx ts-node etc/sdam_viz.js -h | 
|  | 6 | + | 
|  | 7 | +const { MongoClient } = require('../src'); | 
|  | 8 | +const { now, calculateDurationInMs, arrayStrictEqual, errorStrictEqual } = require('../src/utils'); | 
|  | 9 | + | 
|  | 10 | +const util = require('util'); | 
|  | 11 | +const chalk = require('chalk'); | 
|  | 12 | +const argv = require('yargs') | 
|  | 13 | +  .usage('Usage: $0 [options] <connection string>') | 
|  | 14 | +  .demandCommand(1) | 
|  | 15 | +  .help('h') | 
|  | 16 | +  .describe('workload', 'Simulate a read workload') | 
|  | 17 | +  .describe('writeWorkload', 'Simulate a write workload') | 
|  | 18 | +  .describe('writeWorkloadInterval', 'Time interval between write workload write attempts') | 
|  | 19 | +  .describe('writeWorkloadSampleSize', 'Sample size between status display for write workload') | 
|  | 20 | +  .describe('legacy', 'Use the legacy topology types') | 
|  | 21 | +  .alias('l', 'legacy') | 
|  | 22 | +  .alias('w', 'workload') | 
|  | 23 | +  .alias('h', 'help').argv; | 
|  | 24 | + | 
|  | 25 | +function print(msg) { | 
|  | 26 | +  console.log(`${chalk.white(new Date().toISOString())} ${msg}`); | 
|  | 27 | +} | 
|  | 28 | + | 
|  | 29 | +const uri = argv._[0]; | 
|  | 30 | +const client = new MongoClient(uri); | 
|  | 31 | + | 
|  | 32 | +function diff(lhs, rhs, fields, comparator) { | 
|  | 33 | +  return fields.reduce((diff, field) => { | 
|  | 34 | +    if ((lhs[field] == null || rhs[field] == null) && field !== 'error') { | 
|  | 35 | +      return diff; | 
|  | 36 | +    } | 
|  | 37 | + | 
|  | 38 | +    if (!comparator(lhs[field], rhs[field])) { | 
|  | 39 | +      diff.push( | 
|  | 40 | +        `  ${field}: ${chalk.green(`${util.inspect(lhs[field])}`)} => ${chalk.green( | 
|  | 41 | +          `${util.inspect(rhs[field])}` | 
|  | 42 | +        )}` | 
|  | 43 | +      ); | 
|  | 44 | +    } | 
|  | 45 | + | 
|  | 46 | +    return diff; | 
|  | 47 | +  }, []); | 
|  | 48 | +} | 
|  | 49 | + | 
|  | 50 | +function serverDescriptionDiff(lhs, rhs) { | 
|  | 51 | +  const objectIdFields = ['electionId']; | 
|  | 52 | +  const arrayFields = ['hosts', 'tags']; | 
|  | 53 | +  const simpleFields = [ | 
|  | 54 | +    'type', | 
|  | 55 | +    'minWireVersion', | 
|  | 56 | +    'me', | 
|  | 57 | +    'setName', | 
|  | 58 | +    'setVersion', | 
|  | 59 | +    'electionId', | 
|  | 60 | +    'primary', | 
|  | 61 | +    'logicalSessionTimeoutMinutes' | 
|  | 62 | +  ]; | 
|  | 63 | + | 
|  | 64 | +  return diff(lhs, rhs, simpleFields, (x, y) => x === y) | 
|  | 65 | +    .concat(diff(lhs, rhs, ['error'], (x, y) => errorStrictEqual(x, y))) | 
|  | 66 | +    .concat(diff(lhs, rhs, arrayFields, (x, y) => arrayStrictEqual(x, y))) | 
|  | 67 | +    .concat(diff(lhs, rhs, objectIdFields, (x, y) => x.equals(y))) | 
|  | 68 | +    .join(',\n'); | 
|  | 69 | +} | 
|  | 70 | + | 
|  | 71 | +function topologyDescriptionDiff(lhs, rhs) { | 
|  | 72 | +  const simpleFields = [ | 
|  | 73 | +    'type', | 
|  | 74 | +    'setName', | 
|  | 75 | +    'maxSetVersion', | 
|  | 76 | +    'stale', | 
|  | 77 | +    'compatible', | 
|  | 78 | +    'compatibilityError', | 
|  | 79 | +    'logicalSessionTimeoutMinutes', | 
|  | 80 | +    'error', | 
|  | 81 | +    'commonWireVersion' | 
|  | 82 | +  ]; | 
|  | 83 | + | 
|  | 84 | +  return diff(lhs, rhs, simpleFields, (x, y) => x === y).join(',\n'); | 
|  | 85 | +} | 
|  | 86 | + | 
|  | 87 | +function visualizeMonitoringEvents(client) { | 
|  | 88 | +  function print(msg) { | 
|  | 89 | +    console.error(`${chalk.white(new Date().toISOString())} ${msg}`); | 
|  | 90 | +  } | 
|  | 91 | + | 
|  | 92 | +  client.on('serverHeartbeatStarted', event => | 
|  | 93 | +    print(`${chalk.yellow('heartbeat')} ${chalk.bold('started')} host: '${event.connectionId}`) | 
|  | 94 | +  ); | 
|  | 95 | + | 
|  | 96 | +  client.on('serverHeartbeatSucceeded', event => | 
|  | 97 | +    print( | 
|  | 98 | +      `${chalk.yellow('heartbeat')} ${chalk.green('succeeded')} host: '${ | 
|  | 99 | +        event.connectionId | 
|  | 100 | +      }' ${chalk.gray(`(${event.duration} ms)`)}` | 
|  | 101 | +    ) | 
|  | 102 | +  ); | 
|  | 103 | + | 
|  | 104 | +  client.on('serverHeartbeatFailed', event => | 
|  | 105 | +    print( | 
|  | 106 | +      `${chalk.yellow('heartbeat')} ${chalk.red('failed')} host: '${ | 
|  | 107 | +        event.connectionId | 
|  | 108 | +      }' ${chalk.gray(`(${event.duration} ms)`)}` | 
|  | 109 | +    ) | 
|  | 110 | +  ); | 
|  | 111 | + | 
|  | 112 | +  // server information | 
|  | 113 | +  client.on('serverOpening', event => { | 
|  | 114 | +    print( | 
|  | 115 | +      `${chalk.cyan('server')} [${event.address}] ${chalk.bold('opening')} in topology#${ | 
|  | 116 | +        event.topologyId | 
|  | 117 | +      }` | 
|  | 118 | +    ); | 
|  | 119 | +  }); | 
|  | 120 | + | 
|  | 121 | +  client.on('serverClosed', event => { | 
|  | 122 | +    print( | 
|  | 123 | +      `${chalk.cyan('server')} [${event.address}] ${chalk.bold('closed')} in topology#${ | 
|  | 124 | +        event.topologyId | 
|  | 125 | +      }` | 
|  | 126 | +    ); | 
|  | 127 | +  }); | 
|  | 128 | + | 
|  | 129 | +  client.on('serverDescriptionChanged', event => { | 
|  | 130 | +    print(`${chalk.cyan('server')} [${event.address}] changed:`); | 
|  | 131 | +    console.error(serverDescriptionDiff(event.previousDescription, event.newDescription)); | 
|  | 132 | +  }); | 
|  | 133 | + | 
|  | 134 | +  // topology information | 
|  | 135 | +  client.on('topologyOpening', event => { | 
|  | 136 | +    print(`${chalk.magenta('topology')} adding topology#${event.topologyId}`); | 
|  | 137 | +  }); | 
|  | 138 | + | 
|  | 139 | +  client.on('topologyClosed', event => { | 
|  | 140 | +    print(`${chalk.magenta('topology')} removing topology#${event.topologyId}`); | 
|  | 141 | +  }); | 
|  | 142 | + | 
|  | 143 | +  client.on('topologyDescriptionChanged', event => { | 
|  | 144 | +    const diff = topologyDescriptionDiff(event.previousDescription, event.newDescription); | 
|  | 145 | +    if (diff !== '') { | 
|  | 146 | +      print(`${chalk.magenta('topology')} [topology#${event.topologyId}] changed:`); | 
|  | 147 | +      console.error(diff); | 
|  | 148 | +    } | 
|  | 149 | +  }); | 
|  | 150 | +} | 
|  | 151 | + | 
|  | 152 | +async function run() { | 
|  | 153 | +  print(`connecting to: ${chalk.bold(uri)}`); | 
|  | 154 | + | 
|  | 155 | +  visualizeMonitoringEvents(client); | 
|  | 156 | +  await client.connect(); | 
|  | 157 | + | 
|  | 158 | +  if (argv.workload) { | 
|  | 159 | +    scheduleWorkload(client); | 
|  | 160 | +  } | 
|  | 161 | + | 
|  | 162 | +  if (argv.writeWorkload) { | 
|  | 163 | +    scheduleWriteWorkload(client); | 
|  | 164 | +  } | 
|  | 165 | +} | 
|  | 166 | + | 
|  | 167 | +let workloadTimer; | 
|  | 168 | +let workloadCounter = 0; | 
|  | 169 | +let workloadInterrupt = false; | 
|  | 170 | +async function scheduleWorkload(client) { | 
|  | 171 | +  if (!workloadInterrupt) { | 
|  | 172 | +    // immediately reschedule work | 
|  | 173 | +    workloadTimer = setTimeout(() => scheduleWorkload(client), 7000); | 
|  | 174 | +  } | 
|  | 175 | + | 
|  | 176 | +  const currentWorkload = workloadCounter++; | 
|  | 177 | + | 
|  | 178 | +  try { | 
|  | 179 | +    print(`${chalk.yellow(`workload#${currentWorkload}`)} issuing find...`); | 
|  | 180 | +    const result = await client | 
|  | 181 | +      .db('test') | 
|  | 182 | +      .collection('test') | 
|  | 183 | +      .find({}, { socketTimeoutMS: 2000 }) | 
|  | 184 | +      .limit(1) | 
|  | 185 | +      .toArray(); | 
|  | 186 | + | 
|  | 187 | +    print( | 
|  | 188 | +      `${chalk.yellow(`workload#${currentWorkload}`)} find completed: ${JSON.stringify(result)}` | 
|  | 189 | +    ); | 
|  | 190 | +  } catch (e) { | 
|  | 191 | +    print(`${chalk.yellow(`workload#${currentWorkload}`)} find failed: ${e.message}`); | 
|  | 192 | +  } | 
|  | 193 | +} | 
|  | 194 | + | 
|  | 195 | +let writeWorkloadTimer; | 
|  | 196 | +let writeWorkloadCounter = 0; | 
|  | 197 | +let averageWriteMS = 0; | 
|  | 198 | +let completedWriteWorkloads = 0; | 
|  | 199 | +const writeWorkloadSampleSize = argv.writeWorkloadSampleSize || 100; | 
|  | 200 | +const writeWorkloadInterval = argv.writeWorkloadInterval || 100; | 
|  | 201 | +async function scheduleWriteWorkload(client) { | 
|  | 202 | +  if (!workloadInterrupt) { | 
|  | 203 | +    // immediately reschedule work | 
|  | 204 | +    writeWorkloadTimer = setTimeout(() => scheduleWriteWorkload(client), writeWorkloadInterval); | 
|  | 205 | +  } | 
|  | 206 | + | 
|  | 207 | +  const currentWriteWorkload = writeWorkloadCounter++; | 
|  | 208 | + | 
|  | 209 | +  try { | 
|  | 210 | +    const start = now(); | 
|  | 211 | +    await client.db('test').collection('test').insertOne({ a: 42 }); | 
|  | 212 | +    averageWriteMS = 0.2 * calculateDurationInMs(start) + 0.8 * averageWriteMS; | 
|  | 213 | + | 
|  | 214 | +    completedWriteWorkloads++; | 
|  | 215 | +    if (completedWriteWorkloads % writeWorkloadSampleSize === 0) { | 
|  | 216 | +      print( | 
|  | 217 | +        `${chalk.yellow( | 
|  | 218 | +          `workload#${currentWriteWorkload}` | 
|  | 219 | +        )} completed ${completedWriteWorkloads} writes with average time: ${averageWriteMS}` | 
|  | 220 | +      ); | 
|  | 221 | +    } | 
|  | 222 | +  } catch (e) { | 
|  | 223 | +    print(`${chalk.yellow(`workload#${currentWriteWorkload}`)} write failed: ${e.message}`); | 
|  | 224 | +  } | 
|  | 225 | +} | 
|  | 226 | + | 
|  | 227 | +let exitRequestCount = 0; | 
|  | 228 | +process.on('SIGINT', async function () { | 
|  | 229 | +  exitRequestCount++; | 
|  | 230 | +  if (exitRequestCount > 3) { | 
|  | 231 | +    console.log('force quitting...'); | 
|  | 232 | +    process.exit(1); | 
|  | 233 | +  } | 
|  | 234 | + | 
|  | 235 | +  workloadInterrupt = true; | 
|  | 236 | +  clearTimeout(workloadTimer); | 
|  | 237 | +  clearTimeout(writeWorkloadTimer); | 
|  | 238 | +  await client.close(); | 
|  | 239 | +}); | 
|  | 240 | + | 
|  | 241 | +run().catch(error => console.log('Caught', error)); | 
0 commit comments