Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Fix memory leak in EHBufferedProducerClient #26748

Merged
merged 55 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
bdf0f4f
harshan-eh-Aug-8-issue-25426
HarshaNalluru Aug 9, 2023
4be66ce
generated
HarshaNalluru Aug 9, 2023
9d72703
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-js into…
HarshaNalluru Aug 21, 2023
11cb431
logs in the test
HarshaNalluru Aug 23, 2023
5efce29
spotted the problem
HarshaNalluru Aug 30, 2023
86380cc
looks like this works
HarshaNalluru Sep 5, 2023
5f46af4
clean up console.logs
HarshaNalluru Sep 5, 2023
724c393
clean up
HarshaNalluru Sep 5, 2023
ab9bbd4
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-js into…
HarshaNalluru Sep 5, 2023
255931e
racePromisesAndAbortLosers
HarshaNalluru Sep 8, 2023
c05f909
stress
HarshaNalluru Sep 8, 2023
a2396ab
core-util abort race losers abstraction
HarshaNalluru Sep 8, 2023
f5c7d4d
import from core-util
HarshaNalluru Sep 8, 2023
a431dc0
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-js into…
HarshaNalluru Sep 8, 2023
bf0402b
api report
HarshaNalluru Sep 8, 2023
43e16f3
rushx format
HarshaNalluru Sep 8, 2023
7a509b2
dockerfile
HarshaNalluru Sep 8, 2023
eb265c7
stress test deps
HarshaNalluru Sep 8, 2023
f2e3a14
Update sdk/core/core-util/CHANGELOG.md
HarshaNalluru Sep 18, 2023
2fc710b
Update sdk/core/core-util/src/aborterUtils.ts
HarshaNalluru Sep 18, 2023
2655813
address feedback and updates to the tests
HarshaNalluru Sep 19, 2023
f9ac4ca
Merge branch 'harshan/issue/25426' of https://github.com/harshanallur…
HarshaNalluru Sep 19, 2023
e80a1fa
swap count++
HarshaNalluru Sep 19, 2023
b570cc2
cancelablePromiseRace types
HarshaNalluru Sep 19, 2023
cd5a505
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-js into…
HarshaNalluru Sep 20, 2023
bf625f7
move stress test updates to a new PR
HarshaNalluru Sep 20, 2023
2d736fa
retain createAbortablePromise file to make less updates with git diff
HarshaNalluru Sep 20, 2023
59c7544
adds tests and format
HarshaNalluru Sep 21, 2023
2bc5b7f
disclaimer
HarshaNalluru Sep 21, 2023
24f4164
move options?.abortSignal?.removeEventListener to finally
HarshaNalluru Sep 21, 2023
7fa08b9
parent abortSignal listener
HarshaNalluru Sep 21, 2023
d256136
builds
HarshaNalluru Sep 21, 2023
b75085a
update date to tomorrow
HarshaNalluru Sep 21, 2023
36138e1
format
HarshaNalluru Sep 21, 2023
8a8ad5b
remove stress test
HarshaNalluru Sep 21, 2023
9fe0515
remove stress test - fix commit
HarshaNalluru Sep 21, 2023
bfa9ace
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-js into…
HarshaNalluru Sep 21, 2023
76f31f3
simplify with a complicated type
HarshaNalluru Sep 21, 2023
a2fb054
API report
HarshaNalluru Sep 21, 2023
9808484
builds
HarshaNalluru Sep 21, 2023
dad44d5
format
HarshaNalluru Sep 21, 2023
bcf38c4
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-js into…
HarshaNalluru Sep 21, 2023
674b539
lock file
HarshaNalluru Sep 21, 2023
6a79e31
fix build core-xml
HarshaNalluru Sep 21, 2023
b94fed1
Merge branch 'harshan/core-xml-fix' of https://github.com/harshanallu…
HarshaNalluru Sep 21, 2023
c89a5dc
Update sdk/eventhub/event-hubs/src/impl/awaitableQueue.ts
HarshaNalluru Sep 21, 2023
324a194
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-js into…
HarshaNalluru Sep 21, 2023
fd34e4d
get rid of the clocks and math
HarshaNalluru Sep 21, 2023
b22b5a8
Merge branch 'harshan/issue/25426' of https://github.com/harshanallur…
HarshaNalluru Sep 21, 2023
d0f87f1
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-js into…
HarshaNalluru Sep 21, 2023
86bc126
revert core-xml changes in this PR
HarshaNalluru Sep 21, 2023
bc363ea
revert core-xml and lock file changes
HarshaNalluru Sep 21, 2023
b865eee
Update sdk/core/core-util/CHANGELOG.md
HarshaNalluru Sep 21, 2023
114e5be
format
HarshaNalluru Sep 21, 2023
5b14885
Update sdk/eventhub/event-hubs/src/impl/awaitableQueue.ts
HarshaNalluru Sep 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions sdk/core/core-util/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# Release History

## 1.4.1 (Unreleased)
## 1.5.0 (2023-09-21)

### Features Added

### Breaking Changes

### Bugs Fixed

### Other Changes
- Adds helper method `cancelablePromiseRace`, an abstraction that leverages `"promise.race()"` and aborts the losers of the race as soon as the first promise settles.
[PR #26748](https://github.com/Azure/azure-sdk-for-js/pull/26748)

## 1.4.0 (2023-08-03)

Expand Down
2 changes: 1 addition & 1 deletion sdk/core/core-util/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@azure/core-util",
"version": "1.4.1",
"version": "1.5.0",
"description": "Core library for shared utility methods",
"sdk-type": "client",
"main": "dist/index.js",
Expand Down
24 changes: 18 additions & 6 deletions sdk/core/core-util/review/core-util.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@

import { AbortSignalLike } from '@azure/abort-controller';

// @public
export type AbortablePromiseBuilder<T> = (abortOptions: {
abortSignal?: AbortSignalLike;
}) => Promise<T>;

// @public
export interface AbortOptions {
abortErrorMsg?: string;
abortSignal?: AbortSignalLike;
}

// @public
export function cancelablePromiseRace<T extends unknown[]>(abortablePromiseBuilders: AbortablePromiseBuilder<T[number]>[], options?: {
abortSignal?: AbortSignalLike;
}): Promise<T[number]>;

// @public
export function computeSha256Hash(content: string, encoding: "base64" | "hex"): Promise<string>;

Expand All @@ -16,19 +32,15 @@ export function computeSha256Hmac(key: string, stringToSign: string, encoding: "
export function createAbortablePromise<T>(buildPromise: (resolve: (value: T | PromiseLike<T>) => void, reject: (reason?: any) => void) => void, options?: CreateAbortablePromiseOptions): Promise<T>;

// @public
export interface CreateAbortablePromiseOptions {
abortErrorMsg?: string;
abortSignal?: AbortSignalLike;
export interface CreateAbortablePromiseOptions extends AbortOptions {
cleanupBeforeAbort?: () => void;
}

// @public
export function delay(timeInMs: number, options?: DelayOptions): Promise<void>;

// @public
export interface DelayOptions {
abortErrorMsg?: string;
abortSignal?: AbortSignalLike;
export interface DelayOptions extends AbortOptions {
}

// @public
Expand Down
47 changes: 47 additions & 0 deletions sdk/core/core-util/src/aborterUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { AbortSignalLike, AbortController } from "@azure/abort-controller";

/**
* Options related to abort controller.
*/
export interface AbortOptions {
/**
* The abortSignal associated with containing operation.
*/
abortSignal?: AbortSignalLike;
/**
* The abort error message associated with containing operation.
*/
abortErrorMsg?: string;
}

/**
* Represents a function that returns a promise that can be aborted.
*/
export type AbortablePromiseBuilder<T> = (abortOptions: {
abortSignal?: AbortSignalLike;
}) => Promise<T>;

/**
* promise.race() wrapper that aborts rest of promises as soon as the first promise settles.
*/
export async function cancelablePromiseRace<T extends unknown[]>(
abortablePromiseBuilders: AbortablePromiseBuilder<T[number]>[],
options?: { abortSignal?: AbortSignalLike }
): Promise<T[number]> {
const aborter = new AbortController();
function abortHandler(): void {
aborter.abort();
}
options?.abortSignal?.addEventListener("abort", abortHandler);
try {
return await Promise.race(
abortablePromiseBuilders.map((p) => p({ abortSignal: aborter.signal }))
);
} finally {
aborter.abort();
options?.abortSignal?.removeEventListener("abort", abortHandler);
}
}
9 changes: 3 additions & 6 deletions sdk/core/core-util/src/createAbortablePromise.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { AbortError } from "@azure/abort-controller";
import { AbortOptions } from "./aborterUtils";

/**
* Options for the createAbortablePromise function.
*/
export interface CreateAbortablePromiseOptions {
export interface CreateAbortablePromiseOptions extends AbortOptions {
/** A function to be called if the promise was aborted */
cleanupBeforeAbort?: () => void;
/** An abort signal */
abortSignal?: AbortSignalLike;
/** An abort error message */
abortErrorMsg?: string;
}

/**
Expand Down
13 changes: 2 additions & 11 deletions sdk/core/core-util/src/delay.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { AbortSignalLike } from "@azure/abort-controller";
import { AbortOptions } from "./aborterUtils";
import { createAbortablePromise } from "./createAbortablePromise";

const StandardAbortMessage = "The delay was aborted.";

/**
* Options for support abort functionality for the delay method
*/
export interface DelayOptions {
/**
* The abortSignal associated with containing operation.
*/
abortSignal?: AbortSignalLike;
/**
* The abort error message associated with containing operation.
*/
abortErrorMsg?: string;
}
export interface DelayOptions extends AbortOptions {}

/**
* A wrapper for setTimeout that resolves a promise after timeInMs milliseconds.
Expand Down
1 change: 1 addition & 0 deletions sdk/core/core-util/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

export { delay, DelayOptions } from "./delay";
export { AbortOptions, cancelablePromiseRace, AbortablePromiseBuilder } from "./aborterUtils";
export { createAbortablePromise, CreateAbortablePromiseOptions } from "./createAbortablePromise";
export { getRandomIntegerInclusive } from "./random";
export { isObject, UnknownObject } from "./object";
Expand Down
159 changes: 159 additions & 0 deletions sdk/core/core-util/test/public/aborterUtils.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import * as sinon from "sinon";
import { AbortController, AbortSignalLike } from "@azure/abort-controller";
import chai from "chai";
import chaiAsPromised from "chai-as-promised";
import { cancelablePromiseRace, createAbortablePromise } from "../../src";

chai.use(chaiAsPromised);
const { assert } = chai;

describe("createAbortablePromise", function () {
let token: ReturnType<typeof setTimeout>;
const delayTime = 2500;
const createPromise = ({
abortSignal,
abortErrorMsg,
}: { abortSignal?: AbortSignalLike; abortErrorMsg?: string } = {}): Promise<unknown> =>
createAbortablePromise(
(resolve) => {
token = setTimeout(resolve, delayTime);
},
{
cleanupBeforeAbort: () => clearTimeout(token),
abortSignal,
abortErrorMsg,
}
);
afterEach(function () {
sinon.restore();
});

it("should resolve if not aborted nor rejected", async function () {
const clock = sinon.useFakeTimers();
const promise = createPromise();
const time = await clock.nextAsync();
clock.restore();
assert.strictEqual(time, delayTime);
await assert.isFulfilled(promise);
});

it("should reject when aborted", async function () {
const aborter = new AbortController();
const abortErrorMsg = "The test operation was aborted.";
const promise = createPromise({
abortSignal: aborter.signal,
abortErrorMsg,
});
aborter.abort();
await assert.isRejected(promise, abortErrorMsg);
});
});

describe("cancelablePromiseRace", function () {
let function1Aborted = false;
let function2Aborted = false;
let function3Aborted = false;
const function1Delay = 100;
let function2Delay = 200;
const function3Delay = 2000; // Default: function1Delay < function2Delay < function3Delay
const function2Message = "function 2 is rejected";
const function3Message = "function 3 is rejected";

const function1 = async (abortOptions: { abortSignal?: AbortSignalLike }): Promise<number> => {
let token: ReturnType<typeof setTimeout>;
return createAbortablePromise(
(resolve) => {
token = setTimeout(resolve, function1Delay);
},
{
cleanupBeforeAbort: () => {
clearTimeout(token);
function1Aborted = true;
},
abortSignal: abortOptions.abortSignal,
}
);
};

const function2 = async (abortOptions: { abortSignal?: AbortSignalLike }): Promise<string> => {
let token: ReturnType<typeof setTimeout>;
return createAbortablePromise(
(reject) => {
token = setTimeout(() => reject(function2Message), function2Delay);
},
{
cleanupBeforeAbort: () => {
clearTimeout(token);
function2Aborted = true;
},
abortSignal: abortOptions.abortSignal,
}
);
};

const function3 = async (abortOptions: { abortSignal?: AbortSignalLike }): Promise<void> => {
let token: ReturnType<typeof setTimeout>;
return createAbortablePromise(
(resolve, reject) => {
token =
Math.random() < 0.5
? setTimeout(resolve, function3Delay)
: setTimeout(() => reject(function3Message), function3Delay);
},
{
cleanupBeforeAbort: () => {
clearTimeout(token);
function3Aborted = true;
},
abortSignal: abortOptions.abortSignal,
}
);
};

afterEach(function () {
// reset to default values
function1Aborted = false;
function2Aborted = false;
function2Delay = 200;
function3Aborted = false;
});

it("should resolve with the first promise that resolves, abort the rest", async function () {
await cancelablePromiseRace<[number, string, void]>([function1, function2, function3]); // 1 finishes first, 2&3 are aborted
assert.isFalse(function1Aborted); // checks 1 is not aborted
assert.isTrue(function2Aborted); // checks 2 is aborted
assert.isTrue(function3Aborted); // checks 3 is aborted
});

it("should reject with the first promise that rejects, abort the rest", async function () {
function2Delay = function1Delay / 2;
assert.strictEqual(
await cancelablePromiseRace<[number, string, void]>([function1, function2, function3]),
function2Message
); // 2 rejects and finishes first, 1&3 are aborted
assert.isTrue(function1Aborted); // checks 1 is aborted
assert.isFalse(function2Aborted); // checks 2 is not aborted
assert.isTrue(function3Aborted); // checks 3 is aborted
});

it("should respect the abort signal supplied", async function () {
const aborter = new AbortController();
setTimeout(() => aborter.abort(), function1Delay / 2);
let errorThrown = false;
try {
await cancelablePromiseRace<[number, string, void]>([function1, function2, function3], {
abortSignal: aborter.signal,
}); // all are aborted
} catch (error) {
errorThrown = true;
assert.strictEqual((error as { message: string }).message, "The operation was aborted.");
}
assert.isTrue(errorThrown);
assert.isTrue(function1Aborted); // checks 1 is aborted
assert.isTrue(function2Aborted); // checks 2 is aborted
assert.isTrue(function3Aborted); // checks 3 is aborted
});
});
53 changes: 0 additions & 53 deletions sdk/core/core-util/test/public/createAbortablePromise.spec.ts

This file was deleted.

Loading
Loading