Skip to content

Commit

Permalink
Run format for SB, EH and Core AMQP (Azure#14323)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramya-rao-a authored and vindicatesociety committed Apr 26, 2021
1 parent d68cc4b commit 603e521
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 120 deletions.
7 changes: 1 addition & 6 deletions sdk/core/core-amqp/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,7 @@ export function isSystemError(err: unknown): err is NetworkSystemError {
*/
function isBrowserWebsocketError(err: any): boolean {
let result: boolean = false;
if (
!isNode &&
self &&
err.type === "error" &&
err.target instanceof (self as any).WebSocket
) {
if (!isNode && self && err.type === "error" && err.target instanceof (self as any).WebSocket) {
result = true;
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { packageJsonInfo } from "../../../src/util/constants";
// following test is in place to ensure the values in package.json and in this file are consistent
describe("Ensure package name and version are consistent in SDK and package.json", function(): void {
it("Ensure constants.ts file is consistent with package.json", () => {
const packageJsonFilePath = path.join(__dirname, "..", "..","..", "..", "package.json");
const packageJsonFilePath = path.join(__dirname, "..", "..", "..", "..", "package.json");
const rawFileContents = fs.readFileSync(packageJsonFilePath, { encoding: "utf-8" });
const packageJsonContents = JSON.parse(rawFileContents);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
// Licensed under the MIT license.

import { CloseReason, EventHubProducerClient, ReceivedEventData } from "../../../src";
import {
PartitionContext,
SubscriptionEventHandlers
} from "../../../src";
import { PartitionContext, SubscriptionEventHandlers } from "../../../src";
import chai from "chai";
import { delay } from "@azure/core-amqp";

Expand Down
6 changes: 5 additions & 1 deletion sdk/servicebus/service-bus/rollup.base.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ export function browserConfig(test = false) {
baseConfig.onwarn = ignoreKnownWarnings;

if (test) {
baseConfig.input = ["dist-esm/test/public/**/*.spec.js", "dist-esm/test/internal/*.spec.js", "dist-esm/test/internal/unit/*.spec.js"];
baseConfig.input = [
"dist-esm/test/public/**/*.spec.js",
"dist-esm/test/internal/*.spec.js",
"dist-esm/test/internal/unit/*.spec.js"
];
baseConfig.plugins.unshift(multiEntry({ exports: false }));
baseConfig.output.file = "test-browser/index.js";

Expand Down
14 changes: 12 additions & 2 deletions sdk/servicebus/service-bus/test/internal/smoketest.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { ServiceBusReceivedMessage, ServiceBusReceiver, ServiceBusMessage, delay, ProcessErrorArgs, ServiceBusSender } from "../../src";
import {
ServiceBusReceivedMessage,
ServiceBusReceiver,
ServiceBusMessage,
delay,
ProcessErrorArgs,
ServiceBusSender
} from "../../src";
import { TestClientType } from "../public/utils/testUtils";
import chai from "chai";
import chaiAsPromised from "chai-as-promised";
import { getEntityNameFromConnectionString } from "../../src/constructorHelpers";
import { ServiceBusClientForTests, createServiceBusClientForTests } from "../public/utils/testutils2";
import {
ServiceBusClientForTests,
createServiceBusClientForTests
} from "../public/utils/testutils2";
chai.use(chaiAsPromised);
const assert = chai.assert;

Expand Down
169 changes: 91 additions & 78 deletions sdk/servicebus/service-bus/test/stress/scenarioLongRunning.ts
Original file line number Diff line number Diff line change
@@ -1,105 +1,118 @@
import {hostname} from "os";
import { captureConsoleOutputToAppInsights, defaultClient, SBStressTestsBase } from "./stressTestsBase";
import { hostname } from "os";
import {
captureConsoleOutputToAppInsights,
defaultClient,
SBStressTestsBase
} from "./stressTestsBase";
import { AbortController, AbortSignalLike } from "@azure/abort-controller";
import { ServiceBusClient, ServiceBusSender } from "@azure/service-bus";
import { v4 as uuidv4 } from "uuid";

captureConsoleOutputToAppInsights();

const stressTest = new SBStressTestsBase({
testName: "longRunning",
snapshotFocus: [
"send-info",
"receive-info"
]
testName: "longRunning",
snapshotFocus: ["send-info", "receive-info"]
});

async function looper(fn: () => Promise<void>, delay: number, abortSignal: AbortSignalLike) {
const timeout = () => new Promise((resolve) => setTimeout(() => resolve(true), delay));
const timeout = () => new Promise((resolve) => setTimeout(() => resolve(true), delay));

while (!abortSignal.aborted && await timeout()) {
await fn();
}
while (!abortSignal.aborted && (await timeout())) {
await fn();
}
}

async function sendMessagesForever(clientForSender: ServiceBusClient, abortSignal: AbortSignalLike) {
console.log(`Started message sending`);
async function sendMessagesForever(
clientForSender: ServiceBusClient,
abortSignal: AbortSignalLike
) {
console.log(`Started message sending`);

let sender: ServiceBusSender | undefined;
let sender: ServiceBusSender | undefined;

return looper(async () => {
if (abortSignal.aborted) {
console.log(`Aborting sending because of abortSignal`);
return;
}
return looper(
async () => {
if (abortSignal.aborted) {
console.log(`Aborting sending because of abortSignal`);
return;
}

try {
if (sender == null) {
sender = clientForSender.createSender(stressTest.queueName);
}

const messagesToSend = [{
messageId: uuidv4(),
body: `Message: ${Date.now()}`
}];

stressTest.trackSentMessages(messagesToSend);
await sender.sendMessages(messagesToSend);
} catch (err) {
console.log(`Sending message failed: `, err);
stressTest.trackError("send", err);
sender = undefined;
try {
if (sender == null) {
sender = clientForSender.createSender(stressTest.queueName);
}
}, 1000, abortSignal);

const messagesToSend = [
{
messageId: uuidv4(),
body: `Message: ${Date.now()}`
}
];

stressTest.trackSentMessages(messagesToSend);
await sender.sendMessages(messagesToSend);
} catch (err) {
console.log(`Sending message failed: `, err);
stressTest.trackError("send", err);
sender = undefined;
}
},
1000,
abortSignal
);
}

async function main() {
const abortController = new AbortController();
const abortSignal = abortController.signal;

await stressTest.init();

console.log(`Starting with hostname ${hostname}`);

defaultClient.trackEvent({
name: "ApplicationStart"
});

const clientForReceiver = stressTest.createServiceBusClient();

const receiver = clientForReceiver.createReceiver(stressTest.queueName, {
receiveMode: "peekLock"
});

console.log(`Subscribing...`);

const subscription = receiver.subscribe({
processMessage: async (msg) => {
stressTest.addReceivedMessage([msg]);
await stressTest.completeMessage(receiver, msg);
},
processError: async (args) => {
console.log(`subscribe error:`, args);
stressTest.trackError("receive", args.error);
}
}, {
autoCompleteMessages: false,
maxConcurrentCalls: 10
});
const abortController = new AbortController();
const abortSignal = abortController.signal;

await stressTest.init();

console.log(`Starting with hostname ${hostname}`);

defaultClient.trackEvent({
name: "ApplicationStart"
});

const clientForReceiver = stressTest.createServiceBusClient();

const receiver = clientForReceiver.createReceiver(stressTest.queueName, {
receiveMode: "peekLock"
});

console.log(`Subscribing...`);

const subscription = receiver.subscribe(
{
processMessage: async (msg) => {
stressTest.addReceivedMessage([msg]);
await stressTest.completeMessage(receiver, msg);
},
processError: async (args) => {
console.log(`subscribe error:`, args);
stressTest.trackError("receive", args.error);
}
},
{
autoCompleteMessages: false,
maxConcurrentCalls: 10
}
);

const clientForSender = stressTest.createServiceBusClient();
const clientForSender = stressTest.createServiceBusClient();

await sendMessagesForever(clientForSender, abortSignal);
defaultClient.flush();
await sendMessagesForever(clientForSender, abortSignal);
defaultClient.flush();

await subscription.close();
await clientForReceiver.close();
await clientForSender.close();
await subscription.close();
await clientForReceiver.close();
await clientForSender.close();

await stressTest.end();
await stressTest.end();
}

main().catch((err) => {
console.error(err);
process.exit(1);
main().catch((err) => {
console.error(err);
process.exit(1);
});
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { ServiceBusClient, ServiceBusReceivedMessage, ServiceBusReceiver } from "@azure/service-bus";
import {
ServiceBusClient,
ServiceBusReceivedMessage,
ServiceBusReceiver
} from "@azure/service-bus";
import { SBStressTestsBase } from "./stressTestsBase";
import { delay } from "rhea-promise";
import parsedArgs from "minimist";
Expand Down Expand Up @@ -56,7 +60,7 @@ export async function scenarioPeekMessages() {
const sbClient = new ServiceBusClient(connectionString);

await stressBase.init(undefined, undefined, testOptions);

const sender = sbClient.createSender(stressBase.queueName);
let receiver: ServiceBusReceiver;

Expand Down
Loading

0 comments on commit 603e521

Please sign in to comment.